23 Java - Stream (Java8)

Chetan DattaChetan Datta
8 min read

Stream

  • We can consider stream as a pipeline, through which our collection elements passes through.

  • While elements passes through pipelines, it perform various operations like sorting, filtering etc.

  • Useful when deals with bulk processing. (Can do parallel processing)

Architecture

Example

public class Main {
    public static void main(String[] args) {
        List<Integer> salaryList = new ArrayList<>();

        salaryList.add(3000);
        salaryList.add(4100);
        salaryList.add(9000);
        salaryList.add(1000);
        salaryList.add(3500);

        int count = 0;
        for (Integer salary : salaryList){
            if (salary > 3000){
                count++;
            }
        }

        System.out.println("Total Employee with salary > 3000: "+count);
    }
}
// Total Employee with salary > 3000: 3
public class Main {
    public static void main(String[] args) {
        List<Integer> salaryList = new ArrayList<>();

        salaryList.add(3000);
        salaryList.add(4100);
        salaryList.add(9000);
        salaryList.add(1000);
        salaryList.add(3500);

        long output = salaryList.stream().filter((Integer sal) -> sal > 3000).count();
        System.out.println("Total Employee with salary > 3000: "+output);

    }
}
//Total Employee with salary > 3000: 3

Different ways to create a Stream

1. From Collection

List<Integer> salaryList = Arrays.asList(3000, 4100, 9000, 1000, 3500);
Stream<Integer> streamFromIntegerList = salaryList.stream();

2. From Array

Integer[] salaryArray = {3000, 4100, 9000, 1000, 3500};
Stream<Integer> streamFromIntegerList = Arrays.stream(salaryArray);

3. From Static Method

Stream<Integer> streamFromStaticMethod = Stream.of(1000,3500,4000,9000);

4. From Stream Builder

Stream.Builder<Integer> streamBuilder = Stream.builder();
streamBuilder.add(1000).add(9000).add(3500);

Stream<Integer> streamFromStreamBuilder = streamBuilder.build();

5. From Stream Iterate

Stream<Integer> streamFromIterate = Stream.iterate(1000, (Integer n) -> n + 5000).limit(5);

Different Intermediate Operations

We can chain multiple intermediate operations together to perform mroe complex processing before applying terminal operation to produce the result.

1. filter(Predicate<T> predicate)

Filters the element

Stream<String> nameStream = Stream.of("HELLO", "EVERYBODY","HOW","ARE","YOU","DOING");
Stream<String> filteredStream = nameStream.filter((String name) -> name.length()<=3);

List<String> filteredNameList = filteredStream.collect(Collectors.toList());

// [HOW, ARE, YOU]

2. map(Function<T,R> mapper)

Used to transform each element

Stream<String> nameStream = Stream.of("HELLO", "EVERYBODY","HOW","ARE","YOU","DOING");
Stream<String> filteredNames = nameStream.map((String name) -> name.toLowerCase());
//[hello, everybody, how, are, you, doing]

3. flatMap(Function<T,Stream<R>> mapper)

Used to iterate over each element of the complext collection, and helps to flatten it.

List<List<String>> sentenceList = Arrays.asList(
        Arrays.asList("I","LOVE","JAVA"),
        Arrays.asList("CONCEPTS","ARE","CLEAR"),
        Arrays.asList("ITS","VERY","EASY")
);

Stream<String> wordsStream1 = sentenceList.stream().flatMap((List<String> sentence) -> sentence.stream());
System.out.println(wordsStream1.collect(Collectors.toList()));

Stream<String> wordsStream2 = sentenceList.stream().flatMap((List<String> sentence) -> sentence.stream().map((String value)-> value.toLowerCase()));
System.out.println(wordsStream2.collect(Collectors.toList()));

/*
[I, LOVE, JAVA, CONCEPTS, ARE, CLEAR, ITS, VERY, EASY]
[i, love, java, concepts, are, clear, its, very, easy]
*/

4. distinct()

Removes duplicate from the stream

Integer[] arr = {1,5,2,7,4,4,2,0,9};

Stream<Integer> arrStream = Arrays.stream(arr).distinct();
System.out.println(arrStream.collect(Collectors.toList()));

//[1, 5, 2, 7, 4, 0, 9]

5. sorted()

Sorts the elements

Integer[] arr = {1,5,2,7,4,4,2,0,9};

Stream<Integer> arrStream = Arrays.stream(arr).sorted();
System.out.println(arrStream.collect(Collectors.toList()));

//[0, 1, 2, 2, 4, 4, 5, 7, 9]

Stream<Integer> arrStream = Arrays.stream(arr).sorted((Integer val1, Integer val2) -> val2-val1);
System.out.println(arrStream.collect(Collectors.toList()));

//[9, 7, 5, 4, 4, 2, 2, 1, 0]

6. peek(Consumer<T> action)

Helps you to see the intermediate result of the stream which is getting processed.

List<Integer> numbers = Arrays.asList(2,1,3,4,6);
Stream<Integer> numberStream = numbers.stream()
        .filter((Integer val) -> val>2)
        .peek((Integer val) -> System.out.println(val)) //it will print 3,4,6
        .map((Integer val)->-1*val);

List<Integer> numberList = numberStream.collect(Collectors.toList());

7. limit(long maxSize)

Truncate the stream, to have no logner than given maxSize

List<Integer> numbers = Arrays.asList(2,1,3,4,6);
Stream<Integer> numberStream = numbers.stream()
        .limit(3);

System.out.println(numberStream.collect(Collectors.toList()));
//[2, 1, 3]

8. skip(long n)

Skip the first n elements of the stream

    List<Integer> numbers = Arrays.asList(2,1,3,4,6);
    Stream<Integer> numberStream = numbers.stream().skip(3);
    System.out.println(numberStream.collect(Collectors.toList()));
//[4, 6]

9. mapToInt(ToIntFunction<T> mapper)

Helps to work with primitive "int" data types

Example 1:

List<String> numbers = Arrays.asList("2","1","4","7");
IntStream numberStream = numbers.stream().mapToInt((String val)->Integer.parseInt(val));

int[] numberArray = numberStream.toArray();
//Output: 2, 1, 4, 7

Example 2:


int[] numbersArray = {2, 1, 4, 7};
IntStream numbersStream = Arrays.stream(numbersArray);
numbersStream.filter((int val)->val>2);
int[] filteredArray = numbersStream.toArray();
Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
    at java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:246)
    at java.base/java.util.stream.IntPipeline.toArray(IntPipeline.java:562)
    at learn.collection.Main.main(Main.java:22)

Example 3:

int[] numbersArray = {2, 1, 4, 7};
IntStream numbersStream = Arrays.stream(numbersArray);
int[] output = numbersStream.filter((int val)->val>2).toArray();
//4 7

10. mapToLong(ToLongFunction<T> mapper)

List<String> numbers = Arrays.asList("2","1","4","7");
LongStream numberStream = numbers.stream().mapToLong((String val)->Long.parseLong(val));

long[] numberArray = numberStream.toArray();
//Output: 2, 1, 4, 7

long[] numbersArray = {2, 1, 4, 7};
LongStream numbersStream = Arrays.stream(numbersArray);
long[] output = numbersStream.filter((long val)->val>2).toArray();
//Output: 4,7

11. mapToDouble(ToDoubleFunction<T> mapper)

List<String> numbers = Arrays.asList("2","1","4","7");
DoubleStream numberStream = numbers.stream().mapToDouble((String val)->Double.parseDouble(val));

double[] numberArray = numberStream.toArray();
//Output: 2, 1, 4, 7

double[] numbersArray = {2, 1, 4, 7};
DoubleStream numbersStream = Arrays.stream(numbersArray);
double[] output = numbersStream.filter((double val)->val>2).toArray();
//Output: 4,7

Why we call intermediate operation "Lazy"?

    List<Integer> numbers = Arrays.asList(2,1,4,7,10);
    Stream<Integer> numberStream = numbers.stream().filter((Integer val) -> val>=3).peek((Integer val) -> System.out.println(val));

Output: Nothing would be printed in the output

List<Integer> numbers = Arrays.asList(2,1,4,7,10);
Stream<Integer> numberStream = numbers.stream().filter((Integer val) -> val>=3).peek((Integer val) -> System.out.println(val));
numberStream.count(); //count is one of the terminal operation
/*
4
7
10
*/

Sequence of Stream Operations

public class Main {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(2,1,4,7,10);
        Stream<Integer> numberStream = numbers.stream()
                .filter((Integer val) -> val>=3)
                .peek((Integer val) -> System.out.println("after filter: "+val))
                .map((Integer val) -> (val*-1))
                .peek((Integer val) -> System.out.println("after negating:"+ val))
                .sorted()
                .peek((Integer val)-> System.out.println("after Sorted:"+val));

        List<Integer> filteredNumberStream = numberStream.collect(Collectors.toList());
    }
}

Expected Output

after filter: 4
after filter: 7
after filter: 10

after negating:-4
after negating:-7
after negating:-10

after Sorted:-10
after Sorted:-7
after Sorted:-4

Actual Output

after filter: 4
after negating:-4
after filter: 7
after negating:-7
after filter: 10
after negating:-10
after Sorted:-10
after Sorted:-7
after Sorted:-4

Different Terminal Operations

Terminal operations are the ones that produces the result. It triggers the processing of the stream.

1. forEach(Consumer<T> action)

Perform action on each elemnt of the stream. DO NOT returns any value.

List<Integer> numbers = Arrays.asList(2,1,4,7,10);
numbers.stream().filter((Integer val) -> val>=3)
        .forEach((Integer val)-> System.out.println(val));
//Output 4, 7, 10

2. toArray()

Collects the elements of the stream into an Array

List<Integer> numbers = Arrays.asList(2,1,4,7,10);

Object[] filteredNumberArrType1 = numbers.stream()
        .filter((Integer val)-> val>=3)
        .toArray();

Integer[] filteredNumberArrType2 = numbers.stream()
        .filter((Integer val)->val>=3)
        .toArray((int size) -> new Integer[size]);

3. reduce(BinaryOperator<T> accumulator)

Does reduction on the elements of the stream. Perform associative aggregation function.

public class Main {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(2,1,4,7,10);

        Optional<Integer> reducedValue = numbers.stream()
                .reduce((Integer val1, Integer val2)-> val1+val2);

        System.out.println(reducedValue.get());
        //output: 24

    }
}

4. collect(Collector<T,A,R> collector)

can be used to collects the elements of the stream into an List.

List<Integer> numbers = Arrays.asList(2,1,4,7,10);


List<Integer> filteredNumber = numbers.stream()
            .filter((Integer val)->val>=3)
            .collect(Collectors.toList());

5. min(Comparator<T> comparator)

Finds the minimum or maximum element from the stream based on the comparator provided.

List<Integer> numbers = Arrays.asList(2,1,4,7,10);

Optional<Integer> minimumValueType1 = numbers.stream()
                            .filter((Integer val)->val>=3)
                            .min((Integer val1, Integer val2)->val1-val2);
System.out.println(minimumValueType1.get());
//4
Optional<Integer> minimumValueType2 = numbers.stream()
        .filter((Integer val)->val>=3)
        .min((Integer val1, Integer val2)-> val2-val1);
System.out.println(minimumValueType2.get());
//10

6. max(Comparator<T> comparator)

List<Integer> numbers = Arrays.asList(2,1,4,7,10);

Optional<Integer> minimumValueType1 = numbers.stream()
                            .filter((Integer val)->val>=3)
                            .max((Integer val1, Integer val2)->val1-val2);
System.out.println(minimumValueType1.get());
//10
Optional<Integer> minimumValueType2 = numbers.stream()
        .filter((Integer val)->val>=3)
        .max((Integer val1, Integer val2)-> val2-val1);
System.out.println(minimumValueType2.get());
//4

7. count()

returns the count of element present in the stream


    List<Integer> numbers = Arrays.asList(2,1,4,7,10);

    long noOfValuesPresent = numbers.stream()
                                .filter((Integer val)->val>=3)
                                .count();
    //3

8. anyMatch(Predicate<T> predicate)

Checks if any value in the stream match the given predicate and return the boolean

List<Integer> numbers = Arrays.asList(2,1,4,7,10);

boolean hasValueGreaterThanThree = numbers.stream()
                            .anyMatch((Integer val)->val>3);
//true

9. allMatch(Predicate<T> predicate)

Checks if all values in the stream match the given predicate and return the boolean

List<Integer> numbers = Arrays.asList(2,1,4,7,10);

boolean hasValueGreaterThanThree = numbers.stream()
                            .allMatch((Integer val)->val>3);
//false

10. noneMatch(Predicate<T> predicate)

Checks if no value in the stream match the given predicate and return the boolean

List<Integer> numbers = Arrays.asList(2,1,4,7,10);

boolean hasValueGreaterThanThree = numbers.stream()
                            .noneMatch((Integer val)->val>3);
//false

11. findFirst()

finds the first element of the stream

List<Integer> numbers = Arrays.asList(2,1,4,7,10);

Optional<Integer> firstValue = numbers.stream()
                            .filter((Integer val)->val>=3)
                                    .findFirst();
System.out.println(firstValue.get());
//4

12. findAny()

finds any random element of the stream

List<Integer> numbers = Arrays.asList(2,1,4,7,10);

Optional<Integer> anyValue = numbers.stream()
                            .filter((Integer val)->val>=3)
                                    .findAny();
System.out.println(anyValue.get());
//4

How many times we can use a single stream?

One terminal operation is used on a stream, it is closed/consumed and cannot be used again for another terminal operation.

public class Main {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(2,1,4,7,10);

        Stream<Integer> filteredNumbers = numbers.stream()
                .filter((Integer val)->val>=3);

        filteredNumbers.forEach((Integer val) -> System.out.println(val));//consumed the filtered Numbers stream

        //trying to use the closed stream again
        List<Integer> listFromStream = filteredNumbers.collect(Collectors.toList());
    }
}
/*
4
7
10
Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
    at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
    at learn.collection.Main.main(Main.java:20)
*/

Parallel Stream

  • Helps to perform operation on stream concurrently, taking advantage of multi core CPU.

  • ParallelStream() method is used instead of regular stream() method.

  • Internally it does:

    • Task Splitting: it uses "spliterator" function to split the data into multiple chunks.

    • Task submission and parallel processing: uses Fork-Join technique.

public class Main {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(11,22,33,44,55,66,77,88,99,110);

        // Sequential Processing
        long sequentialProcessingStartTime = System.currentTimeMillis();
        numbers.stream()
                .map((Integer val)->val*val)
                .forEach((Integer val)-> System.out.println(val));

        System.out.println("Sequential processing time taken: "+ (System.currentTimeMillis()-sequentialProcessingStartTime)+" millisecond");

        // Parallel Processing
        long parallelProcessingStartTime = System.currentTimeMillis();
        numbers.parallelStream()
                .map((Integer val)->val*val)
                .forEach((Integer val)-> System.out.println(val));

        System.out.println("Parallel processing time taken: "+ (System.currentTimeMillis()-parallelProcessingStartTime)+" millisecond");

    }
}
121
484
1089
1936
3025
4356
5929
7744
9801
12100
Sequential processing time taken: 2 millisecond
5929
4356
9801
12100
7744
484
1089
121
3025
1936
Parallel processing time taken: 4 millisecond

0
Subscribe to my newsletter

Read articles from Chetan Datta directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Chetan Datta
Chetan Datta

I'm someone deeply engrossed in the world of software developement, and I find joy in sharing my thoughts and insights on various topics. You can explore my exclusive content here, where I meticulously document all things tech-related that spark my curiosity. Stay connected for my latest discoveries and observations.