Executor Framework in Java

Executor is a framework provided by the JDK (starting 1.5) which simplifies the execution of tasks in asynchronous mode. At a high level, the Executor classes provide abstraction for assigning tasks to a pool of threads.

The java.util.concurrent package defines three main executor interfaces:

  • Executor

  • ExecutorService

  • ScheduledExecutorService

Before we look deeper into the Executor libraries, we need to look at one more interface and understand its API : java.util.concurrent.Future

Future

Future represents the result of an asynchronous computation in Java : it's not the result itself but the placehold for the result of an asynchronous operation. A Future instance can exist even when computation is still going on and it's underlying result isn't available yet. This allows methods performing asynchronous operation to return immediately in the form of a Future instance while the actual task is still being performed and the result isn't available yet.

Future instances go hand-in-hand with Callable objects representing asynchronous task. Executors which accept tasks in the form of Callable typically return corresponding result in the form of Future

Note that the task is still being performed when Future instance representing the result is provided to the client and there is always a possibility that executing the result resulted in an Exception. In such cases, attempting to extract result from the Future instance will result in ExecutionException. ExecutionException is just a wrapper over the actual Exception and the underlying actual exception can be extracted by using the following method :

 public Throwable getCause()

To extract results from a Future we can use either of the following methods :

 V get() throws InterruptedException, ExecutionException
 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException

If the async operation is done, a call to get returns immediately with the result
If the async operation is still going on, invoking get will cause the thread to be blocked until results are available (or until timeout period elapses)

To check if the results are available of not, we can use the following method :

 boolean isDone()

Note that isDone() returning true means the execution isn't pending anymore - it doesnt mean that the execution was successful. Even in case of an Exception, isDone() will return true. Even if the task has been cancelled, isDone() will return true as no execution is pending.

To distinguish between cancelled and done operations, we can use this:

 boolean isCancelled()

To cancel execution of a task, we can call this :

 boolean cancel(boolean mayInterruptIfRunning)

Cancellation is really an attempt and there is no guarantee that the task will actually be cancelled. If the task hasn't started execution yet, then it wouldn't start execution if marked as cancelled. If the task is already running, then it actually depends on the argument passed. Even if we pass the argument as true, cancellation relies on the task responding to interrupts. So if the actual task ignores interrupts, cancellations wont work. The return value indicates whether cancellation was successful or not.

Here are a few code snippets that illustrate how Future implementations work : (For now ignore how we are actually getting the Future object from the threadpool)

static final ExecutorService threadpool = Executors.newSingleThreadExecutor();

public static void main(String[] args) throws Exception {

    FactorialCalculator task = new FactorialCalculator(15);
    Future<Integer> future =  threadpool.submit(task);
    System.out.println("Task is submitted");

    long start = System.nanoTime();
    while (!future.isDone()) {
        System.out.println("Task executing....");
        Thread.sleep(1); //sleep for 1 millisecond before checking again
    }
    long end = System.nanoTime();
    System.out.println("Time Taken (ms) to get results : "+ (end-start)/1_000_000 + " result:"+ future.get());
}

private static class FactorialCalculator implements Callable {

    private final int number;
    public FactorialCalculator(int number) {
        this.number = number;
    }

    @Override
    public Long call() {
        long output = 0;
        try {
            output =  factorial(number);
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }            
        return output;
    }

    private long factorial(int number) throws InterruptedException {
        long result = 1;
        while (number > 0) {
            TimeUnit.MILLISECONDS.sleep(1);
            result = result * number;
            number--;
        }
        return result;
    }
}

Output:

 Task is submitted
 Task executing....
 Task executing....
 Task executing....
 Task executing....
 Task executing....
 Task executing....
 Task executing....
 Task executing....
 Task executing....
 Task executing....
 Task executing....
 Task executing....
 Task executing....
 Task executing....
 Task executing....
 Time Taken (ms) to get results : 26 result: 3628800

Let's also try cancelling a task midway and extracting the results from the cancelled task

public static void main(String[] args) throws Exception {

    FactorialCalculator task = new FactorialCalculator(15);
    Future<Integer> future =  threadpool.submit(task);
    System.out.println("Task is submitted");

    future.cancel(true);
    System.out.println("Task Cancelled : "+ future.isCancelled());
    System.out.println("Task Done : "+ future.isDone());
    System.out.println("Task results :"+ future.get());
}

Output:

Task is submitted
Task Cancelled : true
Task Done : true
java.util.concurrent.CancellationException
     at java.util.concurrent.FutureTask.report(FutureTask.java:121)
     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
     at Test.main(Test.java:37)

So trying to access results of a cancelled task, results in CancellationException being thrown to the client. In general, it's a good practice to check for isDone() and isCancelled() before attempting to extract results from a Future.

Thread Pool

Now lets have a look at the various thread pool backed Executor implementations

Executor

Executor executes a Runnable task. The aim of the Executor interface to decouple the process of submitting a task from the process of creating and managing threads to execute the task.

Executor is a FunctionalInterface with just the following method :

 void execute(Runnable command)

A client to executor only has control on submitting task to an executor. How the executor internally creates and manages thread is at the discretion of the implementor.

Note that the Executor interface does not strictly require that execution be asynchronous. In the simplest case, an executor can run the submitted task immediately in the caller's thread:

 class MyExecutor implements Executor {
    public void execute(Runnable r) {
      r.run();
    }
 }

Executor is designed to be a drop-in replacement for a common thread-creation idiom. If r is a Runnable object, and e is an Executor object you can replace

 new Thread(r).start();

with

 e.execute(r);

One problem with Executor is that there is no standard provision for shutting down the thread pool and shutdown mechanism is left at the discretion of the implementor which leaves a lot of key questions unanswered (like, what happens if a task isn't complete yet but shutdown mechanism has been initiated for the thread pool implementation behind the Executor )

ExecutorService

The ExecutorService extends Executor and supplements Executor's execute method with a bunch of similar but more versatile API. Also unlike an Executor, an ExecutorService implementation can accept task in the form of both Runnable and Callable objects.

The submit method of ExecutorService works just like execute method of Executor :

 Future<?> submit(Runnable task)

Invoking get() on the returned Future instance will return null (once the execution is complete)

Alternately, ExecutorService allows us to provide a pre-decided return value (when Future.get() is invoked) as well

 <T> Future<T> submit(Runnable task, T result)

ExecutorService can accept tasks in the form of Callable objects as well.

 <T> Future<T> submit(Callable<T> task)

ExecutorService also lets us submit a number of tasks at one go:

 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

The returned list has same number of Futures as in the input Collection of tasks. Also the order of Futures in the returned list is same as the corresponding order of tasks in the input Collection. Note that the method completes and returns only when isDone() is true for all the Futures in the result. So invokiing get() on the returned Futures is guaranteed to not block (However, it can throw ExecutionException or CancellationException as tasks might have been cancelled / terminated because of an Exception)

ExecutorService doesn't provide any guarantee about it's behavior and can return unpredictable results if the input collection is modified while the invokeAll method is still in progress

invokeAll method has an overloaded timeout version as well :

 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException

In the event of a timeout, tasks that have not completed are cancelled (assuming that they respond to interrupts) and isCancelled() would return true for the corresponding Futures

Unlike Executor, ExecutorService doesn't leave shutdown behavior unspecified and provides the following methods to aid in termination process

 void shutdown()

This initiates a graceful shutdown protocol - no new tasks are accepted after invoking shutdown() but all the existing tasks will be completed. If a new task is submitted after invoking shutdown(), RejectedExecutionException is thrown to the task submiiter

ExecutorService also provides a mechanism for immediate shutdown/force shutdown

 List<Runnable> shutdownNow()

shutdownNow() attempts to stop all actively executing tasks immediately. Note that immediate shutdown isn't guaranteed and is only on a best-effort-basis. If the actual tasks ignore interrupts, they might never terminate. Irrespective of whether the tasks are terminated or not, the method returns immediately. The returned list contains all the tasks that were awaiting execution when the method was invoked.

Shutting down an executor service is a one-shot process. Once invoke shut down, there's no way to undo it.

To check whether an ExecutorService has been terminated or not :

 boolean isShutdown()

This method works for both shutdown and shutdown now.

If you want a thread to wait until an ExecutorService shuts down, you can use the following method :

 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException

awaitTermination() blocks the caller thread until the earlier of the two :

  • all remaining tasks have been completed successfully

  • time out

To check if an ExecutorService has terminated (all tasks have been completed or rejected post shutdown) or not, use:

 boolean isTerminated()

Note that isTerminated is never true unless either shutdown or shutdownNow was called first.

Note that there isn't any overloaded awaitTermination method which waits indefinitely without a timeout. If you want to wait indefinitely some of the potential options are :

 executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)

or

while(!executorService.isTerminated()){
    executorService.awaitTermination(timeoutPeriod, TimeUnit.SECONDS)
}

Let's look at a sample code depicting how ExecutorService implementation works

So we submit 5 tasks (each consuming around 2 seconds) to a single threaded executor
We are ensuring that the threads respond to interrupts by stopping execution and returning
After 5 seconds we shutdown the executor service gracefully. Ideally at that point 2 tasks should be done and one midway
Next we terminate it forcefully. The unfinished task should be cancelled and remaining 2 tasks should never be started

public static void main(String[] args) throws Exception {

    ExecutorService executor = Executors.newSingleThreadExecutor();
    Runnable r = new Runnable() {            
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+ " started");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                System.err.println(Thread.currentThread().getName()+ " cancelled");
                return;
            }
            System.out.println(Thread.currentThread().getName()+ " done");
        }
    };
    executor.submit(r);
    executor.submit(r);
    executor.submit(r);
    executor.submit(r);
    executor.submit(r);
    try {
        System.out.println("Shutting down executor");
        executor.shutdown();
        System.out.println("isShutdown :"+executor.isShutdown());
        System.out.println("isTerminated :"+executor.isTerminated());

        executor.awaitTermination(5, TimeUnit.SECONDS);
        System.out.println("isTerminated :"+executor.isTerminated());
    }
    catch (InterruptedException e) {
       e.printStackTrace();
    }
    finally {
        if (!executor.isTerminated()) {
            System.err.println("cancel un-finished tasks");
        }
        List<Runnable> tasks = executor.shutdownNow();
        System.out.println("shutdown finished");
        System.out.println("isTerminated :"+executor.isTerminated());
        System.out.println("# of tasks yet to run :"+tasks.size());
    }

}

Output:

 Shutting down executor
 pool-1-thread-1 started
 isShutdown :true
 isTerminated :false
 pool-1-thread-1 done
 pool-1-thread-1 started
 pool-1-thread-1 done
 pool-1-thread-1 started
 isTerminated :false
 cancel un-finished tasks
 shutdown finished
 isTerminated :false
 # of tasks yet to run :2
 pool-1-thread-1 cancelled

ScheduledExecutorService

ScheduledExecutorService extends ExecutorService and supplements the methods of it's parent with ability to execute with a delay or repeatedly execute at fixed rate/fixed delay. zero or negative delays are considered as immediate execution and in those cases, ScheduledExecutorService behaves just like it's parent (ExecutorService).

Just like ExecutorService, ScheduledExecutorService can work with both Runnable and Callable interfaces when scheduling a task only once.

ScheduledExecutorService provides the following methods to execute a task once with some delay :

 <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
 ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)

Note that the methods return immediately and return instances of ScheduledFuture and not Future. ScheduledFuture extend Delay (in addition to extending Future) and provide us ability to check time remaining until the task starts becoming enabled for execution

 long getDelay(TimeUnit unit)

After the task is executed, getDelay() will return negative values.

Let's take a look at this code snippet :

public static void main(String[] args) throws Exception {

    ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
    ScheduledFuture future = service.schedule(new Runnable() {        
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " : scheduled thread running");
        }            
    }, 5, TimeUnit.SECONDS);

    while(future.getDelay(TimeUnit.SECONDS) >= 0) {
        System.out.println("Seconds Remaining : "+ future.getDelay(TimeUnit.SECONDS));
        TimeUnit.SECONDS.sleep(1);
    }
    service.shutdown();
}

A task is scheduled with a delay of 5 seconds and we are checking after every second how many more seconds are left.

Output:

 Seconds Remaining : 4
 Seconds Remaining : 3
 Seconds Remaining : 2
 Seconds Remaining : 1
 Seconds Remaining : 0
 pool-1-thread-1 : scheduled thread running
 Seconds Remaining : 0

A ScheduledExecutorService can also be used to run a task repeatedly.

There are two options:

 ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)

scheduleAtFixedRate creates and executes a periodic action repeatedly. If the initial delay is t1 and the period is t2, the task will run after t1, t1 + t2, t1 + 2t2, t1 + 3t2, ... and so on.

If any run results in Exception, the subsequent runs are suppressed.
If any task execution takes longer than the configured period, the subsequent run is delayed but under no circumstances, two runs are triggered concurrently

and

 ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

scheduleWithFixedDelay
creates and executes a periodic action repeatedly. If the initial delay is t1 and subsequent delay is t2, the task will run at t1 and at t2 + last run execution time for every subsequent run

Here's an example of scheduling at fixed rate:

public static void main(String[] args) throws Exception {

    ScheduledExecutorService service = Executors.newScheduledThreadPool(1);

    System.out.println("Current Time : "+ new Date());
    ScheduledFuture future = service.scheduleAtFixedRate(new Runnable() {        
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " : scheduled task running at "+ new Date());
        }            
    }, 1, 2, TimeUnit.MINUTES);
}

So, the task will be executed first at 1 min after start, then once every 2 mins after that :

Output:

Current Time : Fri Dec 28 00:10:54 EST 2018
pool-1-thread-1 : scheduled task running at Fri Dec 28 00:11:54 EST 2018
pool-1-thread-1 : scheduled task running at Fri Dec 28 00:13:54 EST 2018
pool-1-thread-1 : scheduled task running at Fri Dec 28 00:15:54 EST 2018
pool-1-thread-1 : scheduled task running at Fri Dec 28 00:17:54 EST 2018

Clearly the first run happened with 1 minute delay whereas subsequent runs are scheduled every 2 minutes

What if the task takes too long, Let's take an example where a task takes 3 mins to run, but is configured to run every 1 minute

public static void main(String[] args) throws Exception {

    ScheduledExecutorService service = Executors.newScheduledThreadPool(2);

    System.out.println("Current Time : "+ new Date());
    ScheduledFuture future = service.scheduleAtFixedRate(new Runnable() {        
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " task started at "+ new Date());
            try {
                TimeUnit.MINUTES.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " task over at "+ new Date());
        }            
    }, 0, 1, TimeUnit.MINUTES);
}

Output:

 Current Time : Fri Dec 28 00:25:30 EST 2018
 pool-1-thread-1 task started at Fri Dec 28 00:25:30 EST 2018
 pool-1-thread-1 task started at Fri Dec 28 00:28:30 EST 2018
 pool-1-thread-1 task started at Fri Dec 28 00:28:30 EST 2018
 pool-1-thread-1 task started at Fri Dec 28 00:31:30 EST 2018

Even though the job was scheduled to run every minute, since the first job took 3 mins, the second job stated only after the first job ended (even though there were 2 threads in the pool)

Now lets try scheduling with fixed delay. In this example our task takes 1 minute to run and is scheduled to run with 2 minute delay from last task completion

public static void main(String[] args) throws Exception {

    ScheduledExecutorService service = Executors.newScheduledThreadPool(2);

    System.out.println("Current Time : "+ new Date());
    ScheduledFuture future = service.scheduleWithFixedDelay(new Runnable() {        
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " task started at "+ new Date());
            try {
                TimeUnit.MINUTES.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " task over at "+ new Date());
        }            
    }, 0, 2, TimeUnit.MINUTES);
}

Output;

Current Time : Fri Dec 28 00:30:12 EST 2018
pool-1-thread-1 task started at Fri Dec 28 00:30:12 EST 2018
pool-1-thread-1 task over at Fri Dec 28 00:31:12 EST 2018
pool-1-thread-1 task started at Fri Dec 28 00:33:12 EST 2018
pool-1-thread-1 task over at Fri Dec 28 00:34:12 EST 2018
pool-1-thread-1 task started at Fri Dec 28 00:36:12 EST 2018
pool-1-thread-1 task over at Fri Dec 28 00:37:12 EST 2018

CompletionService

CompletionService was added to the java.util.concurrent package in Java 1.7. CompletionService decouples the production of new asynchronous tasks from the consumption of the results of completed tasks. One drawback of ExecutorService is that if you are agnostic to the order of the tasks and want to process the results as and when they are completed, there is no easy way to do that. The invokeAll method returns only when all tasks are done whereas with submit/execute if we add a lot of tasks there's no way to know which ones got done other than iterating over the Futures and invoking isDone() on each of them. CompletionService attempts to solve this problem by returning returns in completion order. A CompletionService can be thought of as a combination of two queues - the input queue is where tasks get piled up whereas the output queue is where the results of the completed tasks are put. The onus is on the client to figure out which output is related to which input (if that's important to the application's logic) and the service on it's own doesn't provide any mechanism to do that.

CompletionService can work with both Callable and Runnable based tasks. To submit a task to a CompletionService use either of the following methods:

  Future<V> submit(Callable<V> task)
  Future<V> submit(Runnable task, V result)

Note that submit always returns immediately and completion of the submit method doesn't indicate completion of the actual task. For that we'll have to check the state of the Future object returned.

To get results off the completion queue, use this :

 Future<V> take() throws InterruptedException

Note that results are piled in completion order which isn't necessarily same as submission order. To link a result to a submitted task, you'll need to explicitly put a state in the result which can be used to associate with the input task. However, if you really need to link the results with the input task, you are better off using ExecutorService.

The commonest implementation of CompletionService is probably ExecutorCompletionService which uses a supplied Executor to execute tasks. The ExecutorCompletionService class arranges that submitted tasks are, upon completion, placed on a queue accessible using take.

To create an instance of ExecutorCompletionService, use either of the following constructors :

 public ExecutorCompletionService(Executor executor)
 public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)

Note that , by default, an ExecutorCompletionService uses a LinkedBlockingQueue as the completion queue which is an unbounded queue. This means that a put operation on the queue never blocks whereas a take operation will block if there is nothing to take in the queue. In real world, many a times, we have more number of consumers than producers and we might run into a scenario where consumers put results on the completion queue at a faster rate than producers drain the queue leading to an exponential growth in memory.

Here's a code snippet which simulates this situation where producer is slower than consumer:

public static void main(String[] args) throws Exception {

    ExecutorService executor = Executors.newFixedThreadPool(4);
    CompletionService<Integer> service = new ExecutorCompletionService<>(executor);

    Thread t = new Thread() {
        @Override
        public void run() {
            for(int i=0; i<1_000_000_000; i++) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                    Future<Integer> future = service.take();
                    Integer value = future.get();
                } catch (Exception e) {
                    e.printStackTrace();
                }                    
            }
        }
    };
    t.start();
    Random random = new Random();
    for(int i=0; i<1_000_000_000; i++) {
        service.submit(new Callable<Integer>(){
            @Override
            public Integer call() throws Exception {
                return random.nextInt();
            }                    
        });    
    }
}

If we connect a profiling tool to this JVM we can clearly see that the memory footprint is increasing with time
More often than not, producers and consumers don't exactly move at the same space. So ideally we need to have some sort of throttling mechanism to make sure that neither the producer nor the consumer gets too far ahead of the other party.

One option is to use the alternate constructor of ExecutorCompletionService which lets us specify a custom BlockingQueue. However, if we look into the source code for ExecutorCompletionService, we can see that we have another problem :

private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
        super(task, null);
        this.task = task;
    }
    protected void done() { completionQueue.add(task); }
    private final Future<V> task;
}

Look at the done() method - it puts results into completionQueue by using the add method.

If we look at the javadoc for Blocking Queue here, this is what it says about the add method :

So, even if we use a bounded blocking queue, we'll end up with an IllegalStateException if the queue gets full.

An alternative is to restrict producers from submitting too many tasks to begin with. Use a semaphore to determine maximum number of objects in input queue. Before putting a new task, producer needs to acquire a permit from the semaphore. Once a consumer starts processing, it releases a permit back to the semaphore. This ensures that producers can't simply go on piling up tasks if consumers are too slow to respond.
But what if we have 3 entities instead of two :

Producer : puts tasks on the input queue

Consumer: runs tasks and puts results on completion queue

Result-Processor: Takes completed results from completion queue and performs further processing

In this case, throttling between producers and consumers would have no impact on completion queue getting full. Once queue is full, consumers will start discarding results as those threads would run into IllegalStateException. If we rely on count of number of tasks submitted and invoke take() that many times, it would block indefinitely as some results are lost and number of results on completion queue is less than the number of tasks submitted.

There's even a post by Doug Lea acknowledging that bounded blocking queues cant be used reliably in ExecutorCompletionService

Executors

Executors is essentially a factory class for creating different types of Executor/ExecutorService/ScheduledExecutorService implementations.

To create a single threaded ExecutorService/ScheduledExecutorService operating off an unbounded queue where tasks are being piled up, we can invoke the following static methods of Executors class :

 public static ExecutorService newSingleThreadExecutor()
 public static ScheduledExecutorService newSingleThreadScheduledExecutor()

Note that even if the executor is single threaded, if this single thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks. Tasks are guaranteed to execute sequentially, and no more than one task will be active at any given time.

In most real world applications, however, we would want a thread pool backed by configurable number of threads. We can use the following methods to achieve that :

 public static ExecutorService newFixedThreadPool(int nThreads)
 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

At any point, at most nThreads threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available. If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks. The threads in the pool will exist until the executor is explicitly shutdown.

We can also have executor backed thread pools with dynamic number of running/runnable threads

 public static ExecutorService newCachedThreadPool()

CachedThreadPool creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available. These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks. Calls to execute will reuse previously constructed threads if available. If no existing thread is available, a new thread will be created and added to the pool. Threads that have not been used for sixty seconds are terminated and removed from the cache. Thus, a pool that remains idle for long enough will not consume any resources. However, if the number of tasks are too high, CachedThreadPool might create way too many threads which would degrade performance

If we look at the internal implementation of newCachedThreadPool() :

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

The max number of threads is set to Integer.MAX_VALUE which can potentially be problematic

Here's a sample code using a fixed thread pool based executor :

public static void main(String[] args) throws Exception {
    ExecutorService executor = Executors.newFixedThreadPool(4);            
    Runnable r = new Runnable() {
        @Override
        public void run() {
            int total = 0;
            for (int j=0; j< 1_000_000_000; j++) {
                total = total + j;
            }
        }                
    };
    for (int i=0; i<1_000_000_000; i++) {
        executor.submit(r);
    }
}

By connecting VisualVM to the process and monitoring the threads, we can confirm that at no point of time did the number of threads exceed 4 even though we submitted a billion tasks

Let's run the same code after changing the implementation from a fixed thread pool to a cached thread pool :

public static void main(String[] args) throws Exception {
    ExecutorService executor = Executors.newCachedThreadPool();        
    Runnable r = new Runnable() {
        @Override
        public void run() {
            int total = 0;
            for (int j=0; j< 1_000_000_000; j++) {
                total = total + j;
            }
        }                
    };
    for (int i=0; i<1_000_000_000; i++) {
        executor.submit(r);
    }
}

Look at the number of threads there (highlighted)

Clearly running 3000+ threads isn't good for performance as the cost of context switching would wipe away the benefits of running multiple threads in parallel

Almost all the Executors API takes an optional ThreadFactory argument. A threadfactory helps us customize the threads that are part of the executor thread pool.
To access the default thread factory, we can use this :

 public static ThreadFactory defaultThreadFactory()

Let's try implementing a custom thread factory where all the threads in the executor pool will be daemon threads :

public static void main(String[] args) throws Exception {
    ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            return thread;
        }
    });    
    Runnable r = new Runnable() {
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+ " : Running");
        }                
    };
    for (int i=0; i<5; i++) {
        executor.submit(r);
    }
    TimeUnit.SECONDS.sleep(2);
}

By running it we can clearly see that even though we haven't shutdown the pool explicitly, when the main thread is done, the process terminates thus indicating the threads in the pool were all daemon threads

Executors also provides utility methods to convert a Runnable into a Callable. This is particularly useful when using legacy pre Java 1.5 code with Executors

 public static Callable<Object> callable(Runnable task)
 public static <T> Callable<T> callable(Runnable task, T result)

While executors are popular, they are not the only way to create a thread pool backed executor. We can directly instantiate the ThreadPoolExecutor instance as well (which is what factor methods in Executors class internally do). ThreadPoolExecutor implements ExectorService. Instantiating ThreadPoolExecutor directly gives us more control on the behavior of the thread pool implementation.

Here are some of the features that can be configured through a ThreadPoolExecutor :

  1. Core and maximum pool size:
    Lets say core pool size is set to C and maximum pool size is set to M. Lets assume currently the number of threads in the pool is T
    When a new task is submitted,
    if T < C, a new thread is created to service the task
    if T >= C but T < M, if there are idle threads is the pool, they would process the task. If there are no idle threads, a new thread is created to service the task
    if T = M, if there are idle threads is the pool, they would process the task. If there are no idle threads, the task will be processed when one of threads is done.
    If C == M, it becomes a fixed thread pool.
    In general it's a good practice to set core pool size to 0. That way if the client to the pool doesn't invoke shutdown, the threads will be discarded after a while.

  2. keep alive time :
    If the pool currently has more than corePoolSize threads, excess threads will be terminated if they have been idle for more than the keepAliveTime. This provides a means of reducing resource consumption when the pool is not being actively used. If the pool becomes more active later, new threads will be constructed.

  3. queueing :
    ThreadPoolExecutor allows us to specify the BlockingQueue implementation used to transfer and hold submitted tasks. If this is a bounded queue and new tasks cant be added because the queue is full, the tasks would be rejected.

  4. handling rejected tasks :
    A task may be rejected if
    It was added after the Executor was shutdown or,
    The executor used finite bound for queue capacity and queue is full
    Thread pool executor allows us to specify a RejectedExecutionHandler which can deal with the rejected tasks. A common way to handle rejected tasks due to queue being full would be to try to put it back into the queue with a delay or with a blocking call.

  5. Hook methods :
    ThreadPoolExecutor class provides protected overridable beforeExecute and afterExecute methods that are called before and after execution of each task. These can be used to manipulate the execution environment; for example, reinitializing ThreadLocals, gathering statistics, or adding log entries. Additionally, method terminated() can be overridden to perform any special processing that needs to be done once the Executor has fully terminated.
    If hook or callback methods throw exceptions, internal worker threads may in turn fail and abruptly terminate.

To instantiate a ThreadPoolExecutor, use any of the following constructors :

 ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
 ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
 ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
 ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

The core and max pool size are typically set through constructor args. However, even after constructing a thread pool, we can manipulate the pool sizes dynamically using the following methods:

 void setCorePoolSize(int corePoolSize)
 void setMaximumPoolSize(int maximumPoolSize)

The same applies to keep alive time. It can be set through the constructor as well as changed dynamically using the following method:

 void setKeepAliveTime(long time, TimeUnit unit)

Rejected tasks can be handled by configuring a RejectedExecutionHandler and implementing the following the method :

 void rejectedExecution(Runnable r, ThreadPoolExecutor executor)

The first param is the task which got rejected while the second param is a reference to the executor.

If we want some common behavior every time before a task is executed, we can do so :

 protected void beforeExecute(Thread t, Runnable r)

The Runnable instance is the task that is to be executed before a thread starts, whereas the Thread param indicates the thread that will execute this task.

Similarly, if we want some
common behavior every time after a task is executed, we can do so :

 protected void afterExecute(Runnable r, Throwable t)

Note that this doesn't accept a Thread as param because the Runnable task is executed by the same thread which completed a task. In case, the task was abnormally terminated with an Exception, the Exception details are available through the method args.

Note that the default implementations of beforeExecute and afterExecute do nothing (which is also why these methods are protected). To use these methods, we need to subclass ThreadPoolExecutor and override them

 protected void beforeExecute(Thread t, Runnable r) { }
 protected void afterExecute(Runnable r, Throwable t) { }

Similarly, there's a callback method executed when the pool is terminated :

 protected void terminated() { }

Note that the core pool threads are not started by default until a task is submitted. To start any/all of the core pool threads use:

 boolean prestartCoreThread()
 int prestartAllCoreThreads()

comments powered by Disqus