29 Java - Multithreading 6 (Future | Callable | CompletableFuture)

Chetan DattaChetan Datta
8 min read

Status of submitted task

public class Main {
    public static void main(String[] args) {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
                new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

        //New thread will be created, and it will perform the task

        poolExecutor.submit(()->{
            System.out.println("this is the task, which thread will execute");
        });

        //main thread will continue processing
    }
}
  • Now, what if caller want to know the status of the thread1. Whether its completed or failed etc.

  • To know the status of submit we use Future

Future

  • Interface which represents the result of the Async task.

  • Means, it allow you to check if:

    • Computation is complete

    • Ger the result

    • Take care of exception if any

public class Main {
    public static void main(String[] args) {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
                new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

        //New thread will be created, and it will perform the task

        Future<?> futureObj = poolExecutor.submit(()->{
            System.out.println("this is the task, which thread will execute");
        });

        //caller is checking the status of the thread it created
        System.out.println(futureObj.isDone());

        poolExecutor.shutdown();
    }
}
/*
false
this is the task, which thread will execute
*/

Future Methods

boolean cancel(boolean mayInterruptIfRunning)

  • Attempts to cancel the execution of the task.

  • Returns false, if task can not be cancelled. Typically because task already completed; returns true otherwise.

boolean isCancelled()

  • Returns true, if task was cancelled before it get completed.

boolean isDone()

  • Returns true if this task completed.

  • Completion may be due to normal termination, an exception, or cancellation.

  • In all these cases, this method will return true.

V get()

  • Wait if required, for the completion of the task.

  • After task completed, retrieve the result if available.

V get(long timeout, TimeUnit unit)

  • Wait if required, for at most the given timeout period.

  • Throws TimeoutException if timeout period finished and task is not yet completed.

Future Examples

Example 1

public class Main {
    public static void main(String[] args) {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
                new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

        //New thread will be created, and it will perform the task

        Future<?> futureObj = poolExecutor.submit(()->{
            try{
                Thread.sleep(7000);
                System.out.println("this is the task, which thread will execute");
            }
            catch (Exception e){
                //handle the exception
            }
        });

        //caller is checking the status of the thread it created
        System.out.println(futureObj.isDone());

        try{
            futureObj.get();
            System.out.println("Finished waiting for the future Object");
        }
        catch (Exception e){

        }
        poolExecutor.shutdown();
    }
}
false
this is the task, which thread will execute
Finished waiting for the future Object

Example 2

public class Main {
    public static void main(String[] args) {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
                new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

        //New thread will be created, and it will perform the task

        Future<?> futureObj = poolExecutor.submit(()->{
            try{
                Thread.sleep(7000);
                System.out.println("this is the task, which thread will execute");
            }
            catch (Exception e){
                //handle the exception
            }
        });

        //caller is checking the status of the thread it created
        System.out.println(futureObj.isDone());

        try{
            futureObj.get(2, TimeUnit.SECONDS);
            System.out.println("Finished waiting for the future Object");
        }
        catch (TimeoutException e){
            System.out.println("TimeoutException happened");
        }
        catch (Exception e){

        }

        poolExecutor.shutdown();
    }
}
false
TimeoutException happened
this is the task, which thread will execute

Example 3

public class Main {
    public static void main(String[] args) {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
                new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

        //New thread will be created, and it will perform the task

        Future<?> futureObj = poolExecutor.submit(()->{
            try{
                Thread.sleep(7000);
                System.out.println("this is the task, which thread will execute");
            }
            catch (Exception e){
                //handle the exception
            }
        });

        //caller is checking the status of the thread it created
        System.out.println(futureObj.isDone());

        try{
            futureObj.get(2, TimeUnit.SECONDS);
        }
        catch (TimeoutException e){
            System.out.println("TimeoutException happened");
        }
        catch (Exception e){

        }


        try{
            futureObj.get();
            System.out.println("Finished waiting for the future Object");
        }
        catch (Exception e){

        }

        System.out.println("is Done: "+ futureObj.isDone());
        System.out.println("is Cancelled: "+futureObj.isCancelled());

        poolExecutor.shutdown();
    }
}
false
TimeoutException happened
this is the task, which thread will execute
Finished waiting for the future Object
is Done: true
is Cancelled: false
  • FutureTask is the child of Future interface

  • FutureTask is a wrapper of Runnable and State of the thread.

Callable

  • Callable represents the task which need to be executed just like Runnable

  • But difference is:

    • Runnable do not have any Return type.

    • Callable has the capability to return the value

public class Main {
    public static void main(String[] args) {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
                new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

        Future<Integer> futureObj = poolExecutor.submit(() -> {
            System.out.println("do something");
            return 45;
        });

        poolExecutor.shutdown();
    }
}

Runnable interface

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

Callable interface

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

ThreadPoolExecutor submit types

Example:

public class Main {
    public static void main(String[] args) {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
                new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

        //UseCase1 --> Runnable
        //Runnable has no return type
        //To know the future object we are using the wild card
        //Internally it puts Void
        Future<?> futureObj1 = poolExecutor.submit(() -> {
            System.out.println("Task1 with Runnable");
        });

        try{
            Object obj = futureObj1.get();
            System.out.println(obj==null);
        }
        catch (Exception e){

        }

        //UseCase2
        List<Integer> output = new ArrayList<>();
        Future<List<Integer>> futureObj2 = poolExecutor.submit(() -> {
            output.add(100);
            System.out.println("do something");
            return output;
        });

        try{
            List<Integer> outputFromFutureObj2 = futureObj2.get();
            System.out.println(outputFromFutureObj2);
        }
        catch (Exception e){

        }

        //UseCase3
        Future<List<Integer>> futureObj3 = poolExecutor.submit(() -> {
            System.out.println("Task3 with Callable");
            List<Integer> listObj = new ArrayList<>();
            listObj.add(200);
            return listObj;
        });

        try{
            List<Integer> outputFromFutureObject3 = futureObj3.get();
            System.out.println(outputFromFutureObject3);
        }
        catch (Exception e){

        }

        poolExecutor.shutdown();
    }
}
something
true
do something
[100]
Task3 with Callable
[200]

submit(Runnable)

        Future<?> futureObj1 = poolExecutor.submit(() -> {
            System.out.println("Task1 with Runnable");
        });

        try{
            Object obj = futureObj1.get();
            System.out.println(obj==null);
        }
        catch (Exception e){

        }
//true

submit(Runnable, T)

        //UseCase2
        List<Integer> output = new ArrayList<>();
        Future<List<Integer>> futureObj2 = poolExecutor.submit(() -> {
            output.add(100);
            System.out.println("do something");
            return output;
        });

        try{
            List<Integer> outputFromFutureObj2 = futureObj2.get();
            System.out.println(outputFromFutureObj2);
        }
        catch (Exception e){

        }

Or

//UseCase2
List<Integer> output = new ArrayList<>();
Future<List<Integer>> futureObj2 = poolExecutor.submit(new MyRunnable(output), output);
public class MyRunnable implements Runnable{

    List<Integer> output;

    public MyRunnable(List<Integer> output) {
        this.output = output;
    }

    @Override
    public void run() {
        output.add(100);
        System.out.println("do something");
    }
}

submit(Callable<T>)

        //UseCase3
        Future<List<Integer>> futureObj3 = poolExecutor.submit(() -> {
            System.out.println("Task3 with Callable");
            List<Integer> listObj = new ArrayList<>();
            listObj.add(200);
            return listObj;
        });

        try{
            List<Integer> outputFromFutureObject3 = futureObj3.get();
            System.out.println(outputFromFutureObject3);
        }
        catch (Exception e){

        }

CompletableFuture

  • Introduced in Java8

  • To help in async programming

  • We can considered it as advanced version of Future

  • Provides additional capability like chaining.

  • Future is an interface and it's child is CompletableFuture

CompletableFuture Methods

supplyAsync

  1. public static<T> CompletableFuture<T> supplyAsync(Supplier<T> supplier)

  2. public static<T> CompletableFuture<T> supplyAsync(Supplier<T> supplier, Executor executor)

  • supplyAsync method initiates an Async operation. Means it internally creates a new thread.

  • 'supplier' is executed asynchronously in a separate new thread

  • If we want more control on Threads, we can pass Executor in the method

  • By default it uses, shared Fork-Join Pool executor. It dynamically adjust its pool size based on processors.

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
                new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());


        CompletableFuture<String> asyncTask1 = CompletableFuture.supplyAsync(() -> {
            //this is the task which need to be completed by thread
            return "task completed";
        }, poolExecutor);
        System.out.println(asyncTask1.get());

        poolExecutor.shutdown();
    }
}

thenApply & thenApplyAsync

  • Apply a function to the result of previous Async computation

  • Return a new CompletableFuture object.

thenApply

  • It's a synchronous execution

  • Means, it uses same thread which completed the previous Async task.

Example 1

CompletableFuture<String> asyncTask1 = CompletableFuture.supplyAsync(() -> {
    //this is the task which need to be completed by thread
    return "task completed";
}, poolExecutor).thenApply((String val)->{
    //Functionality which can work on the result of the previous async task
    return val + " in thenApply";
});
System.out.println(asyncTask1.get());

Example 2:

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
                new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());


        CompletableFuture<String> compFutureObj1 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("ThreadName of SupplyAsync: "+ Thread.currentThread().getName());
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                //handle exception
            }
            return "Supply Task1 ";
        }, poolExecutor);

        CompletableFuture<String> compFutureObj2 = compFutureObj1.thenApply((String val) -> {
            System.out.println("ThreadName of thenApply: " + Thread.currentThread().getName());
            return "And ";
        });

        System.out.println("Thread Name for 'after Completable Future: " + Thread.currentThread().getName());


        poolExecutor.shutdown();
    }
}
Thread Name for 'after Completable Future: main
ThreadName of SupplyAsync: pool-1-thread-1
ThreadName of thenApply: pool-1-thread-1

thenApplyAsync

  • It's a Asynchronous execution

  • Means, it uses different thread (from fork-join pool, if we do not provide the executor in the method), to complete this function.

  • If multiple thenApplyAsync is used, ordering cannot be guarantee, they will run concurrently.

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1,1,10, TimeUnit.HOURS,
                new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());


        CompletableFuture<String> compFutureObj1 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("ThreadName of SupplyAsync: "+ Thread.currentThread().getName());
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                //handle exception
            }
            return "Supply Task1 ";
        }, poolExecutor);

        CompletableFuture<String> compFutureObj2 = compFutureObj1.thenApplyAsync((String val) -> {
            System.out.println("ThreadName of thenApplyAsync: " + Thread.currentThread().getName());
            return val + "And ";
        });

        System.out.println("Thread Name for 'after Completable Future: " + Thread.currentThread().getName());

        System.out.println("Async output : "+ compFutureObj2.get());

        poolExecutor.shutdown();
    }
}

thenCompose & thenComposeAsync

  • Chain together dependent Async operations

  • Means when next Async operation depends on the result of the previous Async one.

  • we can tied them together.

  • For async tasks, we can bring some ordering using this

thenCompose

CompletableFuture<String> compFutureObj1 = CompletableFuture.supplyAsync(() -> {
    return "Hello";
}, poolExecutor);

CompletableFuture<String> compFutureObj2 = compFutureObj1.thenCompose((String val) -> {
    return CompletableFuture.supplyAsync(() -> val + " world");
});

System.out.println(compFutureObj2.get());

/*
Hello World
*/

theComposeAsync

CompletableFuture<String> compFutureObj1 = CompletableFuture.supplyAsync(() -> {
    return "Hello";
}, poolExecutor)
        .thenComposeAsync((String val) -> {
    return CompletableFuture.supplyAsync(() -> val + " world");
})
        .thenComposeAsync((String val)->{
            return CompletableFuture.supplyAsync(() -> val+ " all");
        })
        ;

System.out.println(compFutureObj1.get());
/*
Hello world all
*/

thenAccept & thenAcceptAsync

  • Generally end stage, in the chain of Sync operations

  • It does not return anything.

CompletableFuture<String> compFutureObj1 = CompletableFuture.supplyAsync(() -> {
    return "Hello";
}, poolExecutor);

CompletableFuture<Void> acceptObj = compFutureObj1.thenAccept((String val) -> System.out.println("All stages completed"));

thenCombine & thenCombineAsync

  • Used to combine the result of 2 Comparable Future

  • Async means it creates/uses separate new thread.

CompletableFuture<Integer> asyncTask1 = CompletableFuture.supplyAsync(() -> {
    return 10;
}, poolExecutor);

CompletableFuture<String> asyncTask2 = CompletableFuture.supplyAsync(()->{
    return "k";
}, poolExecutor);

CompletableFuture<String> combinedFutureObj = asyncTask1.thenCombine(asyncTask2, (Integer val1, String val2) -> {
    return val1 + val2;
});

System.out.println(combinedFutureObj.get());
//10k
0
Subscribe to my newsletter

Read articles from Chetan Datta directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Chetan Datta
Chetan Datta

I'm someone deeply engrossed in the world of software developement, and I find joy in sharing my thoughts and insights on various topics. You can explore my exclusive content here, where I meticulously document all things tech-related that spark my curiosity. Stay connected for my latest discoveries and observations.