Version 3

    Overview

    jboss 7 thread model has blocking-bounded-queue-thread-pool  and bounded-queue-thread-pool and queueless-thread-pool.

     

    now we tell about bounded-queue-thread-pool .

     

    BoundedQueueThreadPool for web container executor thread pool,jboss default configure:

     

         <subsystem xmlns="urn:jboss:domain:threads:1.1">   

                <thread-factory name="web-container-thread-factory" group-name="web-container-thread-group" thread-name-pattern="web-container-thread-%t"/>   

                <bounded-queue-thread-pool name="http-executor">   

                    <core-threads count="20"/>   

                    <queue-length count="10"/>   

                    <max-threads count="100"/>   

                    <keepalive-time time="20" unit="minutes"/>   

                    <thread-factory name="web-container-thread-factory"/>   

                </bounded-queue-thread-pool>   

            </subsystem>

     

    <connector name="http" protocol="HTTP/1.1" scheme="http" socket-binding="http" executor="http-executor" />

     

    so we can find 'http-executor'  use web container thread pool for http request.

     

    explain thread process

    In jboss web container Acceptor thread ,when new request in,it will invoking  'JIoEndpoint.processSocket()' method to process socket.

     

    protected boolean processSocket(Socket socket) {

             try {

                 if (executor == null) {

                     Worker worker = getWorkerThread();

                     if (worker != null) {

                         worker.assign(socket);

                     } else {

                         return false;

                     }

                 } else {

                     executor.execute(new SocketProcessor(socket));

                 }

             } catch (Throwable t) {

                 // This means we got an OOM or similar creating a thread, or that

                 // the pool and its queue are full

                 CoyoteLogger.UTIL_LOGGER.errorProcessingSocket(t);

                 return false;

             }

             return true;

         }

     

    now we can find ,it go  ‘’executor.execute(new SocketProcessor(socket)) ‘ ,because of we executor not null.The `executor` is pack for `BoundedQueueThreadPool`.

    `SocketProcessor(socket)`  is process socket task.processSocket method only take task to threadpool.

     

    Now we explain BoundedQueueThreadPool how to process task.

     

    When a task enters into the thread pool, if the number of threads running is less than size core,then create a new thread to handle task. If thread core size is full, then check if  the queue has space.if has space  put the task in the queue.

    If the queue is full, then determine whether the number of threads running is less than maximum.If  less than, create a new thread to handle task.Otherwise, to determine whether the blocking property is true. If it is true, it has been blocked, until the waiting queue has a location in the store task.  If false, directly to a executor hand-off to deal with. If executor null is hand-off, then the task is denied.

     

    jboss has no BoundedQueueThreadPool class,but has BoundedQueueThreadPoolService.BoundedQueueThreadPoolService class is a service,can look it is QueueExecutor pack.QueueExecutor the core method is execute(),the  code Is a description of the above.

     

    public void execute(final Runnable task) throws RejectedExecutionException {

             if (task == null) {

                 throw new NullPointerException("task is null");

             }

             final Executor executor;

             final Lock lock = this.lock;

             lock.lock();

             try {

                 for (;;) {

                     if (stop) {

                         throw new StoppedExecutorException("Executor is stopped");

                     }

                     // Try core thread first, then queue, then extra thread

                     final int count = threadCount;

                     //if running thread count<coreThreads,then create a thread

                     if (count < coreThreads) {

                         startNewThread(task);//create new thread

                         threadCount = count + 1;

                         return;

                     }

                     // next queue...

                     //if queue has space,then put in task and send a signal

                     final Queue<Runnable> queue = this.queue;

                     if (queue.offer(task)) {

                         enqueueCondition.signal();

                         return;

                     }

                     // extra threads?

                     //if running thread count<maxThreads,create a new thread

                     if (count < maxThreads) {

                         startNewThread(task);

                         threadCount = count + 1;

                         return;

                     }

                     // deafult blocking为false

                     if (blocking) {

                         try {

                             removeCondition.await();

                         } catch (InterruptedException e) {

                             Thread.currentThread().interrupt();

                             throw new ExecutionInterruptedException("Thread interrupted");

                         }

                     } else {

                         // delegate the task outside of the lock.

                         rejectCount++;

                         executor = handoffExecutor;

                         break;

                     }

                 }

             } finally {

                 lock.unlock();

             }

             if (executor != null) {

                 executor.execute(task);

             }

             return;

         }

     

    look at startNewThread() method:

     

    private void startNewThread(final Runnable task) {

                //first use task create worker

                //then use Worker create a thred

             final Thread thread = threadFactory.newThread(new Worker(task));

             if (thread == null) {

                 throw new ThreadCreationException();

             }

             workers.add(thread);//put thread in workers

             final int size = workers.size();

             if (size > largestPoolSize) {

                 largestPoolSize = size;

             }

             thread.start();//start new thread

         }

    Worker is an internal class in QueueExecutor. (Worker.run) method is to obtain the core code, running task, after performing the instantiation of the specified task, will continue to cycle, respectively (called QueueExecutor.takeTask) to obtain task (Worker.runtask) method to perform task. Only takeTask () to return to the case of null, will exit the loop. After the exit, the thread automatically ends. Code as follows:

     

    public void run() {

                final Lock lock = QueueExecutor.this.lock;

                try {

                    Runnable task = first;//first init work

                    // Release reference to task

                    first = null;

                    runTask(task);//first run init task

                    //then loop,when takeTask null,then exit

      for (;;) {

                        // don't hang on to task while we possibly block waiting for the next one

                        task = null;

                        lock.lock();

                        try {

                            if (stop) {

                                // drain queue

                                if ((task = pollTask()) == null) {

                                    return;

                                }

                                Thread.currentThread().interrupt();

                            } else {

                                // get next task

                                // invoke QueueExecutor.takeTask() method to get next task

                                if ((task = takeTask()) == null) {

                                    return;

                                }

                            }

                        } finally {

                            lock.unlock();

                        }

                        runTask(task);//tun task

                        Thread.interrupted();

                    }

                } finally {

                    boolean last = false;

                    lock.lock();

                    try {

                        workers.remove(Thread.currentThread());

                        last = stop && workers.isEmpty();

                    } finally {

                        lock.unlock();

                    }

                    if (last) {

                        shutdownListenable.shutdown();

                    }

                }

            }

     

    Get task method QueueExecutor.takeTask():

     

    private Runnable takeTask() {

             final Condition removeCondition = this.removeCondition;

             Runnable task = queue.poll();

                //get task from wait queue

             if (task != null) {

                 removeCondition.signal();

                 return task;

             } else {

                    //if not get ,then loop when new task in queue

                 final Condition enqueueCondition = this.enqueueCondition;

                 final long start = System.currentTimeMillis();//start to Timing,for thread keepalive time out

                 boolean intr = Thread.interrupted();

                 try {

                     long elapsed = 0L;

                     for (;;) {

                         // these parameters may change on each iteration

                         final int threadCount = this.threadCount;

                         final int coreThreadLimit = coreThreads;

                         final boolean allowCoreThreadTimeout = this.allowCoreThreadTimeout;

                         if (stop || threadCount > maxThreads) {

                             // too many threads.  Handle a task if there is one, otherwise exit

                             return pollTask();

                         } else if (allowCoreThreadTimeout || threadCount > coreThreadLimit) {

                                //if allowCoreThreadTimeout=true or current thread> core thread,The current thread needs to get task at the specified time.

                             final TimeUnit timeUnit = keepAliveTimeUnit;

                             final long time = keepAliveTime;

                             final long remaining = time - timeUnit.convert(elapsed, TimeUnit.MILLISECONDS);

                             if (remaining > 0L) {

                                 try {

      //waiting task in queue

                                     enqueueCondition.await(remaining, timeUnit);

                                 } catch (InterruptedException e) {

                                     intr = true;

                                 }

                             } else {

                                 // the timeout has expired

                                 return pollTask();

                             }

                         } else {

                             // ignore timeout until we are not a core thread or until core threads are allowed to time out

                             try {

                                 enqueueCondition.await();

                             } catch (InterruptedException e) {

                                 intr = true;

                             }

                         }

                         task = queue.poll();

                         if (task != null) {

                             removeCondition.signal();//blocking==true will use this

                             return task;

                         }

                         elapsed = System.currentTimeMillis() - start;

                     }

                 } finally {

                     if (intr) {

                         Thread.currentThread().interrupt();

                     }

                 }

             }

         }

    explain new request in and Idle Thread how to do that
    when current thread >=core size ,has new request ,it will in queue and make idle thread state waiting to runable to run.