Java : Executor framework

“Thread” to “Thread per Request” to “Thread Pool” :

Single Thread :
1Thread

Single thread has an ability to handle only one user a time. The other users must wait in queue until the working user finished his or her task.
If we consider about CPU and I/O usage, single thread is not fully utilized because while CPU is busy I/O usually is free and vice versa.

 Advantage
• Straight forward
• Easy to develop

Disadvantage
• Waste time.
• CPU and I/O are not fully utilized

Thread per Request :
ThrdPerReq
Thread per request or connection has been introduced in order to maximize the system resource and provide responsiveness to users which single thread is not capable. Thread per request or connection architecture will create a thread for each request from a client. In other words, system can create a number of threads as per request. The only limitation is software and hardware capacity.

Advantage
• No waiting users
• Maximize system resources
• It is a little hard to develop unless you follow the guideline strictly

Disadvantage
• A lot of overhead in creating and destroying threads for many requests or connections
• Server might crash if the number of requests is beyond the server’s capacity

Thread Pool :
ThrdPool
Thread pool pre-defines a number of thread running in its pool. Let’s say we have 2 threads in the pool, then the system is capable of serving only 2 requests at the same time. Other requests must wait in queue until the 2 threads in the pool finish their work.

Advantage
• By optimal number of thread in the pool, the chance of server crash is highly reduced
• Has little request in queue

Disadvantage
• Must do performance tuning for specific hardware capacity
• Difficult to develop until you take advantage of Java concurrency package

Risks of using thread pools
(http://www.ibm.com/developerworks/java/library/j-jtp0730.html)

While the thread pool is a powerful mechanism for structuring multithreaded applications, it is not without risk. Applications built with thread pools are subject to all the same concurrency risks as any other multithreaded application, such as synchronization errors and deadlock, and a few other risks specific to thread pools as well, such as pool-related deadlock, resource thrashing, and thread leakage.

Deadlock
With any multithreaded application, there is a risk of deadlock. A set of processes or threads is said to be deadlocked when each is waiting for an event that only another process in the set can cause.

More info :

https://myshadesofgray.wordpress.com/2012/10/06/deadlock-detection-in-java/

Resource thrashing
If the thread pool size is not tuned properly threads consume numerous resources, including memory and other system resources. While the scheduling overhead of switching between threads is small, with many threads context switching can become a significant drag on your program’s performance.

Concurrency errors
Thread pools and other queuing mechanisms rely on the use of wait() and notify() methods, which can be tricky. If coded incorrectly, it is possible for notifications to be lost, resulting in threads remaining in an idle state even though there is work in the queue to be processed. Great care must be taken when using these facilities.

Thread leakage
A significant risk in all kinds of thread pools is thread leakage, which occurs when a thread is removed from the pool to perform a task, but is not returned to the pool when the task completes. One way this happens is when the task throws a RuntimeException or an Error. If the pool class does not catch these, then the thread will simply exit and the size of the thread pool will be permanently reduced by one. When this happens enough times, the thread pool will eventually be empty, and the system will stall because no threads are available to process tasks.
Tasks that permanently stall, such as those that potentially wait forever for resources that are not guaranteed to become available or for input from users who may have gone home, can also cause the equivalent of thread leakage.

Request overload
It is possible for a server to simply be overwhelmed with requests. In this case, we may not want to queue every incoming request to our work queue, because the tasks queued for execution may consume too many system resources and cause resource starvation. It is up to you to decide what to do in this case; in some situations, you may be able to simply throw the request away, relying on higher-level protocols to retry the request later, or you may want to refuse the request with a response indicating that the server is temporarily busy.

So No need to write your own

In early versions of Java (1.4 or earlier), developers needed to implement concurrent applications—including thread pool logic—themselves using low-level language constructs and the Java Thread API. The results were often poor. The nature of the Java Thread API often led unwitting programmers to develop code that introduced hard-to-debug programming errors.
In Java 5.0, Sun introduced the Java concurrency utilities (JSR-166) to address these issues and provide a standard set of APIs to create concurrent applications.

Introduction to Java Executor Framework

Ruannable vs Callable<T>
The Callable interface is similar to Runnable, in that both are designed for classes whose instances are potentially executed by another thread. A Runnable, however, does not return a result and cannot throw a checked exception.

Runnable Callable<T>
Introduced in Java 1.0 Introduced in Java 1.5 as part of java.util.concurrent library
Runnable cannot be parametrized Callable is a parametrized type whose type parameter indicates the return type of its run method
Classes implementing Runnable needs to implement run() method Classes implementing Callable needs to implement call() method
Runnable.run() returns no Value Callable.call() returns a value of Type T
Can not throw Checked Exceptions Can throw Checked Exceptions
publicinterfaceRunnable {void run();} publicinterfaceCallable<V>{    V call()throwsException;}

 

Note :

In order to convert Runnable to Callable use the following utility method provided by Executors class

Callable callable = Executors.callable(Runnable task); 

Callable, however must be executed using a ExecutorService instead of Thread as shown below.

result = exec.submit(aCallable).get(); 

 

Thread Pool and Executor

Java 5.0 comes with its own thread pool implementation – within the Executor and ExecutorService interfaces. This makes it easier for you to use thread pools within your own programs.

executor

 

The java.util.concurrent package defines three executor interfaces:

The Executor Interface

The Executor interface provides a single method, execute, designed to be a drop-in replacement for a common thread-creation idiom. If r is a Runnable object, and e is anExecutor object you can replace

(new Thread(r)).start();

with

e.execute(r);

However, the definition of execute is less specific. The low-level idiom creates a new thread and launches it immediately. Depending on the Executor implementation,execute may do the same thing, but is more likely to use an existing worker thread to run r, or to place r in a queue to wait for a worker thread to become available. (We’ll describe worker threads in the section on Thread Pools.)

The executor implementations in java.util.concurrent are designed to make full use of the more advanced ExecutorService and ScheduledExecutorServiceinterfaces, although they also work with the base Executor interface.

 

The ExecutorService Interface

The ExecutorService interface supplements execute with a similar, but more versatile submit method. Like execute, submit accepts Runnable objects, but also accepts Callable objects, which allow the task to return a value. The submit method returns a Future object, which is used to retrieve the Callable return value and to manage the status of both Callable and Runnable tasks.

ExecutorService also provides methods for submitting large collections of Callable objects. Finally, ExecutorService provides a number of methods for managing the shutdown of the executor. To support immediate shutdown, tasks should handle interrupts correctly.

exec1

Executor is the root interface with a single execute method. Anything that implements a Runnable interface can passed as a parameter. Silly Executor, however, has no support for Callable though.

 

exec2

The ScheduledExecutorService Interface

The ScheduledExecutorService interface supplements the methods of its parent ExecutorService with schedule, which executes a Runnable or Callable task after a specified delay. In addition, the interface defines scheduleAtFixedRate and scheduleWithFixedDelay, which executes specified tasks repeatedly, at defined intervals.

Creating and Managing ThreadPool in Java :

Managing thread life cycle is expensive and that is how Executors came into picture. In complicated or real life applications, allocating and deallocating memory to multiple threads becomes even more complicated (compared to single thread). So to handle this Java introduced concept of thread pools.

Thread pool consists of worker threads. Tasks are submitted to a thread pool via a queue. Worker threads have a simple life : request new task from a work queue, execute it, and go back to pool for assignment to next task.

Executors class provides factory methods to create thread pools and return executor.

ThreadPool creations and management in java can be handled in two ways :

Using Executors utility Class  factory methods:

  • ExecutorService newCachedThreadPool() creates a thread pool that creates new threads as needed, but which reuses previously constructed threads when they’re available. Threads that haven’t been used for 60 seconds are terminated and removed from the cache. This thread pool typically improves the performance of programs that execute many short-lived asynchronous tasks.
  • ExecutorService newSingleThreadExecutor() creates an executor that uses a single worker thread operating off an unbounded queue — tasks are added to the queue and execute sequentially (no more than one task is active at any one time). If this thread terminates through failure during execution before shutdown of the executor, a new thread will be created to take its place when subsequent tasks need to be executed.
  • ExecutorService newFixedThreadPool(int nThreads) creates a thread pool that re-uses a fixed number of threads operating off a shared unbounded queue. At mostnThreads threads are actively processing tasks. If additional tasks are submitted when all threads are active, they wait in the queue until a thread is available. If any thread terminates through failure during execution before shutdown, a new thread will be created to take its place when subsequent tasks need to be executed. The pool’s threads exist until the executor is shut down.

The Executor framework offers additional types (such as theScheduledExecutorService interface).

Creating customized thread pool executor :

If none of the executors provided by the above factory methods meet your needs, constructing instances of java.util.concurrent.ThreadPoolExecutor or

java.util.concurrent.ScheduledThreadPoolExecutor will give you additional options.

The ThreadPoolExecutor has several constructors available. For instance:


int  corePoolSize  =    5;

int  maxPoolSize   =   10;

long keepAliveTime = 5000;

ExecutorService threadPoolExecutor =        new ThreadPoolExecutor(                corePoolSize,                maxPoolSize,                keepAliveTime,                TimeUnit.MILLISECONDS,                new LinkedBlockingQueue<Runnable>()                );

However, unless you need to specify all these parameters explicitly for your ThreadPoolExecutor, it is often easier to use one of the factory methods in the java.util.concurrent.Executors class.

Future Results

When you ask an executor service to run a callable object, the service returns a Future object that allows you to retrieve those results, monitor the status of the task, and cancel the task. TheFuture interface looks like this:

</pre>
<pre>
public interface Future<V> { V get( ) throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; boolean isDone( ); boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled( ); }</pre>
<pre>

Callable and future objects have a one-to-one correspondence: every callable object that is sent to an executor service returns a matching future object. The get() method of the future object returns the results of its corresponding call( ) method. The get() method blocks until the call() method has returned (or until the optional timeout has expired). If the call() method throws an exception, the get() method throws an ExecutionException with an embedded cause, which is the exception thrown by the call( ) method.

The future object keeps track of the state of an embedded Callable object. The state is set to cancelled when thecancel() method is called. When the call() method of a callable task is called, the call() method checks the state: if the state is cancelled, the call() method immediately returns.

When the cancel() method is called, the corresponding callable object may be in one of three states. It may be waiting for execution, in which case its state is set to cancelled and the call() method is never executed. It may have completed execution, in which case the cancel( ) method has no effect. The object may be in the process of running. In that case, if the mayInterruptIfRunning flag is false, the cancel() method again has no effect.

If the mayInterruptIfRunning flag is true, however, the thread running the callable object is interrupted. The callable object must still pay attention to this, periodically calling the Thread.interrupted() method to see if it should exit.

When an object in a thread pool is cancelled, there is no immediate effect: the object still remains queued for execution. When the thread pool is about to execute the object, it checks the object’s internal state, sees that it has been cancelled, and skips execution of the object. So, cancelling an object on a thread pool queue does not immediately make space in the thread pool’s queue. Future calls to the execute() method may still be rejected, even though cancelled objects are on the thread pool’s queue: the execute() method does not check the queue for cancelled objects.

One way to deal with this situation is to call the purge() method on the thread pool. The purge() method looks over the entire queue and removes any cancelled objects. One caveat applies: if a second thread attempts to add something to the pool (using the execute() method) at the same time the first thread is attempting to purge the queue, the attempt to purge the queue fails and the canceled objects remain in the queue.

A better way to cancel objects with thread pools is to use the remove() method of the thread pool, which immediately removes the task from the thread pool queue. The remove() method can be used with standard runnable objects.

The FutureTask Class

You can associate a Runnable object with a future result using the FutureTask class:

</pre>
<pre>public class FutureTask<V> implements Future<V>, Runnable {}</pre>
<pre>

This class is used internally by the executor service: the object returned from the submit() method of an executor service is an instance of this class. However, you can use this class directly in programs as well. This makes sense when you need to monitor the status of a runnable object within an executor: you can construct a future task with an embedded runnable object and send the future task to the execute() method of an executor (or an executor service). You can then use the methods of the Future interface to monitor the status of the run() method of the embedded runnable object.

FutureTask object can hold either an embedded runnable or callable object, depending on which constructor is used to instantiate the object:

</pre>
<pre>public FutureTask(Callable<V> task); public FutureTask(Runnable task, V result);</pre>
<pre>

The get() method of a future task that embeds a callable task returns whatever is returned by the call( ) method of that embedded object. The get() method of a future task that embeds a runnable object returns whatever object was used to construct the future task object itself

 

Algorithm for using an Executor

1. Create an Executor

You first create an instance of an Executor or ExecutorService in some global context (such as the application context for a servlet container).

The Executors class has a number of convenience static factory methods that create an ExecutorService. For instance, newFixedThreadPool() returns a ThreadPoolExecutor instance which is intialized with an unbounded queue and a fixed number of threads; while newCachedThreadPool() returns a ThreadPoolExecutor instance initialized with an unbounded queue and unbounded number of threads. In the latter case, existing threads are reused if available, and if no free thread is available, a new one is created and added to the pool. Threads that have been idle for longer than a timeout period will be removed from the pool.

private static final Executor executor = Executors.newFixedThreadPool(10);
Rather than use these convenience methods, you might find it more appropriate to instantiate your own fully customized version of ThreadPoolExecutor – using one of its many constructors.

private static final Executor executor = new ThreadPoolExecutor(10, 10, 50000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(100));
This creates a bounded queue of size 100, with a thread pool of fixed size 10.

2. Create one or more tasks

You are required to have one or more tasks to be performed as instances of either Runnable or Callable.

3. Submit the task to the Executor

Once you have an ExecutorService, you can submit a task to it using either the submit() or execute() methods, and a free thread from the pool will automatically dequeue the tasks and execute it.
Methods in Executor Service
There are a few different ways to delegate tasks for execution to an ExecutorService:
• execute(Runnable)
• submit(Runnable)
• submit(Callable)
• invokeAny(…)
• invokeAll(…)

Take a look at each of these methods in the following sections.

execute(Runnable)
The execute(Runnable) method takes a java.lang.Runnable object, and executes it asynchronously. Here is an example of executing a Runnable with an ExecutorService:


ExecutorService executorService = Executors.newSingleThreadExecutor();

executorService.execute(new Runnable() {
    public void run() {
        System.out.println("Asynchronous task");
    }
});

executorService.shutdown();

There is no way of obtaining the result of the executed Runnable, if necessary. You will have to use a Callable for that (explained in the following sections).

submit(Runnable)
The submit(Runnable) method also takes a Runnable implementation, but returns a Future object. ThisFuture object can be used to check if the Runnable as finished executing.
Here is a ExecutorService submit() example:


Future future = executorService.submit(new Runnable() {
    public void run() {
        System.out.println("Asynchronous task");
    }
});

future.get();  //returns null if the task has finished correctly.

submit(Callable)
The submit(Callable) method is similar to the submit(Runnable) method except for the type of parameter it takes. The Callable instance is very similar to a Runnable except that its call() method can return a result. TheRunnable.run() method cannot return a result.
The Callable’s result can be obtained via the Future object returned by the submit(Callable) method. Here is an ExecutorService Callable example:


Future future = executorService.submit(new Callable(){
    public Object call() throws Exception {
        System.out.println("Asynchronous Callable");
        return "Callable Result";
    }
});

System.out.println("future.get() = " + future.get());

The above code example will output this:

Asynchronous Callable
future.get() = Callable Result

The invokeAny() method takes a collection of Callable objects, or subinterfaces of Callable. Invoking this method does not return a Future, but returns the result of one of the Callable objects. You have no guarantee about which of the Callable’s results you get. Just one of the ones that finish.
If one of the tasks complete (or throws an exception), the rest of the Callable’s are cancelled.
Here is a code example:

ExecutorService executorService = Executors.newSingleThreadExecutor();

Set<Callable<String>> callables = new HashSet<Callable<String>>();

callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 1";
    }
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 2";
    }
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 3";
    }
});

String result = executorService.invokeAny(callables);

System.out.println("result = " + result);

executorService.shutdown();

This code example will print out the object returned by one of the Callable’s in the given collection. I have tried running it a few times, and the result changes. Sometimes it is “Task 1”, sometimes “Task 2” etc.

invokeAll()
The invokeAll() method invokes all of the Callable objects you pass to it in the collection passed as parameter. The invokeAll() returns a list of Future objects via which you can obtain the results of the executions of eachCallable.
Keep in mind that a task might finish due to an exception, so it may not have “succeeded”. There is no way on aFuture to tell the difference.
Here is a code example:

ExecutorService executorService = Executors.newSingleThreadExecutor();

Set<Callable<String>> callables = new HashSet<Callable<String>>();

callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 1";
    }
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 2";
    }
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 3";
    }
});

List<Future<String>> futures = executorService.invokeAll(callables);

for(Future<String> future : futures){
    System.out.println("future.get = " + future.get());
}

executorService.shutdown();

the only difference being that all the flavors of submit() returns a Future that can be used to query the result of the submitted task execution.
On the other hand, execute() does not return a Future.
There is no difference in the implementation and in fact submit() internally calls execute().

4. Execute the task and getting the result
As soon as we invoke the submit() method of ExecutorService the Callable are handed over to ExecutorService to execute. Here one thing we have to note, the submit() is not blocking. So, all of our Callables will be submitted right away to the ExecutorService, and ExecutorService will decide when to execute which callable. For each Callable we get a Future object to get the result later.
Once the Callables are submitted for execution, we are iterating through all Futures to get the result of execution. We are invoking the get() method of Future to get the result. Here we have to remember that, the get() is a blocking method. So the first invocation of get() will wait until that Callable is finished. Once we receive the result of the execution of first Callable it will wait for next Callable to finish, and so on. Since the get() is blocking, we could put the iteration on a separate thread to continue the execution, and get the result later when all Futures are ready with their results.

There is also, another useful method named isDone() on Future which let us check if the Callable for that Future is finished or not.

5. Shutdown the Executor
When you are done using the ExecutorService you should shut it down, so the threads do not keep running.
For instance, if your application is started via a main() method and your main thread exits your application, the application will keep running if you have an active ExexutorService in your application. The active threads inside this ExecutorService prevents the JVM from shutting down.
To terminate the threads inside the ExecutorService you call its shutdown() method. The ExecutorServicewill not shut down immediately, but it will no longer accept new tasks, and once all threads have finished current tasks, the ExecutorService shuts down. All tasks submitted to the ExecutorService before shutdown() is called, are executed.
If you want to shut down the ExecutorService immediately, you can call the shutdownNow() method. This will attempt to stop all executing tasks right away, and skips all submitted but non-processed tasks. There are no guarantees given about the executing tasks. Perhaps they stop, perhaps the execute until the end. It is a best effort attempt.

Example :
With Runnable :

package test.exec;

public class MyRunnable implements Runnable {
	private final long countUntil;

	MyRunnable(long countUntil) {
		this.countUntil = countUntil;
	}

	@Override
	public void run() {
		long sum = 0;
		System.out.println(Thread.currentThread().getName()+" : Start");
		for (long i = 1; i < countUntil; i++) {
			sum += i;
		}
		System.out.println(Thread.currentThread().getName() +" : "+ sum+" : End");
	}
}

package test.exec;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class RunnableExecMain {
	private static final int NTHREDS = 10;

	public static void main(String[] args) {
		ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
		for (int i = 0; i < 500; i++) {
			Runnable worker = new MyRunnable(10000000L + i);
			executor.execute(worker);
		}
		// This will make the executor accept no new threads
		// and finish all existing threads in the queue
		executor.shutdown();
		// Wait until all threads are finish
		while (!executor.isTerminated()) {
		}
		System.out.println("Finished all threads");
	}
}

With Callable :

package test.exec;

import java.util.concurrent.Callable;

public class MyCallable implements Callable<Long> {
	@Override
	public Long call() throws Exception {
		long sum = 0;
		for (long i = 0; i <= 100; i++) {
			sum += i;
		}
		return sum;
	}

}
package test.exec;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CallableExecMain {
	private static final int NTHREDS = 10;

	public static void main(String[] args) {

		ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
		List<Future<Long>> list = new ArrayList<Future<Long>>();
		for (int i = 0; i < 20000; i++) {
			Callable<Long> worker = new MyCallable();
			Future<Long> submit = executor.submit(worker);
			list.add(submit);
		}
		long sum = 0;
		System.out.println(list.size());
		// now retrieve the result
		for (Future<Long> future : list) {
			try {
				sum += future.get();
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}
		System.out.println(sum);
		executor.shutdown();
		// Wait until all threads are finish
		while (!executor.isTerminated()) {
		}
		System.out.println("Finished all threads");
	}
}
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s