Ad

Saturday, 24 August 2013

ThreadPoolExecutor-To write ThreadPool in Java


ThreadPoolExecutor is an implementation of ExecutorService which is used to maintain  ThreadPool for allocating and running  Threads.If we don't go for ThreadPool we wont have the control over the  no of Threads created and which will consume more system Resouces.
By using the ThreadPool we can achieve
  • Will create only the configured number of Threads
  • Will reuse the existing ideal Thread to run the Task
  • Will improve the performance and manages the System Resources well.   
So if your requirement is to run many asynchronous Task(i.e Thread),go for the ThreadPoolExecutor which manages the asynchronous task by consuming less system resources.
ThreadPoolExecutor can be created using one of its constructor as shown below

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,        
BlockingQueue<Runnable> workQueue, RejetedExecutionHandler handler)

corePoolSize

  -Minimum Number of Threads in the Pool
maxPoolSize - Maximum Number of Threads in the Pool
keepAliveTime - If the number of running threads is more than CorePoolSize,the ideal Threads will wait  for keep alivetime before terminating
unit - Timeunit for the keepAliveTime i.e MilliSeconds,Seconds or Minutes
workQueue - the queue which is used to hold any new task submitted,if the corePoolSize threads are     busy
handler - The handler to be executed,if any of the submitted task is rejected by the                  ThreadPoolExecutor

For instance lets says we create the ThreadPoolExecutor with corePoolSize as 2,queuesize as 3,maxPoolSize as 5 and keepalivetime as 1 seconds.To start with for each submitted task,Thread will be created upto corePoolSize(i.e 2) and the task will be handled.If this two threads are busy in doing some work and if a new task is submitted the task will be queueud up in the Blocking queue instead of creating a thread.If still the 2 threads are busy and  the blockingQueue is full(i.e 3),the threads  will be created upto maxpoolsize(i.e 5)  in the pool for every submitted task.If  the queue is full and still the 5 threads created (i.e maxPoolSize) are busy,for any new submitted task the task will be rejected and handled through the RejectedExecutionHandler.If  no. of threads running is more than corePoolSize and any thread is idle for 1 seconds the thread will be terminated and only the corePoolSize(i.e 2) thread will be running in pool.Thus at any time the size of the Threadpool will be between corePoolSize(ie 2) and maxPoolSize(ie 5).

To add a task to the ThreadPool,we need to use the below method of ThreadPoolExecutor

public void execute(Runnable Task)                                                                                                     

Lets see a sample code to illustate this.

import java.util.TimerTask;                                                                                                                  
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExample {

 private ThreadPoolExecutor threadPoolExecutor=null;

 public static void main(String[] args) {
  ThreadPoolExample threadPoolExample= new ThreadPoolExample();
  threadPoolExample.createThreadPool();//To create the ThreadPool
  threadPoolExample.submitTask();//To submit the Task
 }

 private void createThreadPool() {
  int poolSize=2;
  int maxPoolSize=5;
  int queueSize=3;
  long aliveTive=1000;
  ArrayBlockingQueue<Runnable> queue= new ArrayBlockingQueue<Runnable>(queueSize);
  threadPoolExecutor= new ThreadPoolExecutor(poolSize,maxPoolSize,aliveTive,
                    TimeUnit.MILLISECONDS,queue,new JobRejectionHandler());
 }

 private void submitTask()
 {
  /*Submit 10 AsunchronousTask to ThreadPool */
  for (int i=1;i<=10;i++)
  {
   threadPoolExecutor.execute(new JobTask("Job"+i));
  }
 }

 /*Asyncchrounous Task Should extend TimerTask*/
 class JobTask extends TimerTask
 {
  private String jobId="";

  public JobTask(String jobId)
  {
   this.jobId=jobId;
  }

  public String getJobId() {
   return jobId;
  }

  @Override
  public void run() {
 
   System.out.println("JobId:"+jobId+" Running through Thread:"+Thread.currentThread().getName());
   /*Make the Task to sleep for 5 seconds,so that the task will be busy*/
   try {
    Thread.sleep(5*1000);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }
    }

/*RejectionHandler to handle any rejected Task*/
 class JobRejectionHandler implements RejectedExecutionHandler
 {

  @Override
  public void rejectedExecution(Runnable arg0, ThreadPoolExecutor arg1) {
   JobTask jobTask=(JobTask)arg0;
      System.out.println("JobId:"+jobTask.getJobId()+" Running through RejectionHandler,Since "
                   +"there are no ideal threads in ThreadPool");
  }

 }
}

Output:

JobId:Job1 Running through Thread:pool-1-thread-1
JobId:Job2 Running through Thread:pool-1-thread-2
JobId:Job9 Running through RejectionHandler,Since there are no ideal threads in ThreadPool
JobId:Job10 Running through RejectionHandler,Since there are no ideal threads in ThreadPool
JobId:Job6 Running through Thread:pool-1-thread-3
JobId:Job7 Running through Thread:pool-1-thread-4
JobId:Job8 Running through Thread:pool-1-thread-5
JobId:Job3 Running through Thread:pool-1-thread-1
JobId:Job4 Running through Thread:pool-1-thread-2
JobId:Job5 Running through Thread:pool-1-thread-3

As we can see in the above output,
  • ThreadPoolExecutor allocates Thread-1 and Thread-2 to handle task Job1 and Job2
  • Since these two threads were busy and the poolsize exceeds two(ie corePoolSize),the newly submitted task Job3,Job4 and Job5 were queued up in the BlockngQueue.
  • Since the queueSize also exceeds(i.e 3),ThreadPool creates three threads(Since maxpoolsize is 5 and already two threads were running)to handle task Job6,Job7 and Job8
  • Since the queue is also full and  the number of threads already allocated exceeds 5(i.e maxpoolsize),the submitted task Job9 and Job10 will be rejected by the ThreadPool and the corresponding rejected handler will be called
  • When any of the 5 running threads become idle,Thread pool will allocate the ideal thread to the task which are added in the Queue.Thus Job3,4 and 5 were allocated to Thread 1,2 and 3 as seen in the output.
  • when there are no task to be run and the thread become idle,the corresponding thread will be terminated i.e Thread 3,4 and 5 will be terminated after they become idle for 1 second.
  • Only two Threads Thread-1 and Thread-2 will running in the ThreadPool waiting for any incoming task.   
Related Topics :

1 comment: