001 /* 002 @license.text@ 003 */ 004 package biz.hammurapi.util; 005 006 import java.util.LinkedList; 007 008 import biz.hammurapi.config.ComponentBase; 009 import biz.hammurapi.config.ConfigurationException; 010 011 012 /** 013 * Distributes work among multiple threads. 014 * @author Pavel Vlasov 015 * @revision $Revision$ 016 */ 017 public class ThreadPool extends ComponentBase implements Worker { 018 019 private int numberOfThreads=10; 020 private int priority=Thread.NORM_PRIORITY; 021 private ExceptionSink exceptionSink; 022 private int maxQueue=10; 023 private boolean stopped; 024 025 public ThreadPool() { 026 // Default constructor 027 } 028 029 /** 030 * @param numberOfThreads Number of threads to create. 031 * @param priority Threads priority. 032 * @param maxQueue Maximum number of jobs in execution queue. When 033 * execution queue reaches its maximum post() processes job in the invoking thread. Values <1 mean no limit. 034 * @param exceptionSink 035 */ 036 public ThreadPool(int numberOfThreads, int priority, int maxQueue, ExceptionSink exceptionSink) { 037 super(); 038 this.numberOfThreads=numberOfThreads; 039 this.priority=priority; 040 this.exceptionSink=exceptionSink; 041 this.maxQueue=maxQueue; 042 } 043 044 public ThreadPool(int numberOfThreads, int priority, int maxQueue, ExceptionSink exceptionSink, String name) { 045 this(numberOfThreads, priority, maxQueue, exceptionSink); 046 if (name!=null) { 047 this.name=name; 048 } 049 } 050 051 private LinkedList jobQueue=new LinkedList(); 052 private int[] threads={0}; 053 private String name = toString(); 054 055 public boolean post(Runnable job) { 056 int queueSize; 057 synchronized (jobQueue) { 058 if (stopped) { 059 return false; 060 } 061 062 queueSize=jobQueue.size(); 063 064 if (maxQueue==0 || queueSize<maxQueue) { 065 // Add job to processing queue 066 jobQueue.add(job); 067 jobQueue.notify(); // wake up one thread. 068 069 addMeasurement("queue", queueSize, 0); 070 addMeasurement("post", 1, 0); 071 return true; 072 } 073 } 074 075 addMeasurement("queue", queueSize, 0); 076 addMeasurement("post", 1, 0); 077 078 // process in current thread. 079 process(job); 080 return true; 081 } 082 083 private class WorkerThread extends Thread { 084 085 public void run() { 086 synchronized (threads) { 087 threads[0]++; 088 } 089 090 try { 091 while (true) { 092 Runnable job; 093 int queueSize; 094 synchronized (jobQueue) { 095 while (!stopped && jobQueue.isEmpty()) { 096 try { 097 jobQueue.wait(); 098 } catch (InterruptedException e) { 099 if (exceptionSink==null) { 100 e.printStackTrace(); 101 } else { 102 exceptionSink.consume(this, e); 103 } 104 return; 105 } 106 } 107 108 if (stopped && jobQueue.isEmpty()) { 109 return; 110 } 111 112 job=(Runnable) jobQueue.removeFirst(); 113 114 queueSize=jobQueue.size(); 115 } 116 117 addMeasurement("queue", queueSize, 0); 118 119 process(job); 120 } 121 } finally { 122 synchronized (threads) { 123 threads[0]--; 124 if (threads[0]<=0) { 125 threads.notifyAll(); 126 } 127 } 128 } 129 } 130 } 131 132 public void start() throws ConfigurationException { 133 for (int i=0; i<numberOfThreads; i++) { 134 Thread th = new WorkerThread(); 135 th.setPriority(priority); 136 th.setName(this.name +"-pool-thread-"+i); 137 th.start(); 138 } 139 } 140 141 /** 142 * Creates threads to replace terminated threads. 143 * Client posting jobs to thread pool may implement liveness check routines and 144 * terminate pool threads in a job hangs. This method allows the client code 145 * to replenish the thread pool after termination of one of worker threads. 146 */ 147 public void replenish() { 148 if (!stopped) { 149 synchronized (threads) { 150 for (int i=threads[0]; i<numberOfThreads; i++) { 151 Thread th = new WorkerThread(); 152 th.setPriority(priority); 153 th.setName(this.name +"-pool-thread-"+i); 154 th.start(); 155 } 156 } 157 } 158 } 159 160 public void stop() throws ConfigurationException { 161 synchronized (threads) { 162 stopped=true; 163 synchronized (jobQueue) { 164 jobQueue.notifyAll(); 165 } 166 167 while (threads[0]>0) { 168 try { 169 threads.wait(); 170 } catch (InterruptedException e) { 171 throw new ConfigurationException("Stop() interrupted", e); 172 } 173 } 174 } 175 } 176 177 /** 178 * @param job 179 */ 180 private void process(Runnable job) { 181 long start=System.currentTimeMillis(); 182 try { 183 job.run(); 184 } catch (Exception e) { 185 if (exceptionSink==null) { 186 e.printStackTrace(); 187 } else { 188 exceptionSink.consume(job, e); 189 } 190 } finally { 191 long now = System.currentTimeMillis(); 192 addMeasurement("run", now - start, now); 193 } 194 } 195 196 public int getMaxQueue() { 197 return maxQueue; 198 } 199 200 public void setMaxQueue(int maxQueue) { 201 this.maxQueue = maxQueue; 202 } 203 204 public String getName() { 205 return name; 206 } 207 208 public void setName(String name) { 209 this.name = name; 210 } 211 212 public int getNumberOfThreads() { 213 return numberOfThreads; 214 } 215 216 public void setNumberOfThreads(int numberOfThreads) { 217 this.numberOfThreads = numberOfThreads; 218 } 219 220 public int getPriority() { 221 return priority; 222 } 223 224 public void setPriority(int priority) { 225 this.priority = priority; 226 } 227 228 public void setExceptionSink(ExceptionSink exceptionSink) { 229 this.exceptionSink = exceptionSink; 230 } 231 232 }