001 /*
002 @license.text@
003 */
004 package biz.hammurapi.util;
005
006 import java.util.LinkedList;
007
008 import biz.hammurapi.config.ComponentBase;
009 import biz.hammurapi.config.ConfigurationException;
010
011
012 /**
013 * Distributes work among multiple threads.
014 * @author Pavel Vlasov
015 * @revision $Revision$
016 */
017 public class ThreadPool extends ComponentBase implements Worker {
018
019 private int numberOfThreads=10;
020 private int priority=Thread.NORM_PRIORITY;
021 private ExceptionSink exceptionSink;
022 private int maxQueue=10;
023 private boolean stopped;
024
025 public ThreadPool() {
026 // Default constructor
027 }
028
029 /**
030 * @param numberOfThreads Number of threads to create.
031 * @param priority Threads priority.
032 * @param maxQueue Maximum number of jobs in execution queue. When
033 * execution queue reaches its maximum post() processes job in the invoking thread. Values <1 mean no limit.
034 * @param exceptionSink
035 */
036 public ThreadPool(int numberOfThreads, int priority, int maxQueue, ExceptionSink exceptionSink) {
037 super();
038 this.numberOfThreads=numberOfThreads;
039 this.priority=priority;
040 this.exceptionSink=exceptionSink;
041 this.maxQueue=maxQueue;
042 }
043
044 public ThreadPool(int numberOfThreads, int priority, int maxQueue, ExceptionSink exceptionSink, String name) {
045 this(numberOfThreads, priority, maxQueue, exceptionSink);
046 if (name!=null) {
047 this.name=name;
048 }
049 }
050
051 private LinkedList jobQueue=new LinkedList();
052 private int[] threads={0};
053 private String name = toString();
054
055 public boolean post(Runnable job) {
056 int queueSize;
057 synchronized (jobQueue) {
058 if (stopped) {
059 return false;
060 }
061
062 queueSize=jobQueue.size();
063
064 if (maxQueue==0 || queueSize<maxQueue) {
065 // Add job to processing queue
066 jobQueue.add(job);
067 jobQueue.notify(); // wake up one thread.
068
069 addMeasurement("queue", queueSize, 0);
070 addMeasurement("post", 1, 0);
071 return true;
072 }
073 }
074
075 addMeasurement("queue", queueSize, 0);
076 addMeasurement("post", 1, 0);
077
078 // process in current thread.
079 process(job);
080 return true;
081 }
082
083 private class WorkerThread extends Thread {
084
085 public void run() {
086 synchronized (threads) {
087 threads[0]++;
088 }
089
090 try {
091 while (true) {
092 Runnable job;
093 int queueSize;
094 synchronized (jobQueue) {
095 while (!stopped && jobQueue.isEmpty()) {
096 try {
097 jobQueue.wait();
098 } catch (InterruptedException e) {
099 if (exceptionSink==null) {
100 e.printStackTrace();
101 } else {
102 exceptionSink.consume(this, e);
103 }
104 return;
105 }
106 }
107
108 if (stopped && jobQueue.isEmpty()) {
109 return;
110 }
111
112 job=(Runnable) jobQueue.removeFirst();
113
114 queueSize=jobQueue.size();
115 }
116
117 addMeasurement("queue", queueSize, 0);
118
119 process(job);
120 }
121 } finally {
122 synchronized (threads) {
123 threads[0]--;
124 if (threads[0]<=0) {
125 threads.notifyAll();
126 }
127 }
128 }
129 }
130 }
131
132 public void start() throws ConfigurationException {
133 for (int i=0; i<numberOfThreads; i++) {
134 Thread th = new WorkerThread();
135 th.setPriority(priority);
136 th.setName(this.name +"-pool-thread-"+i);
137 th.start();
138 }
139 }
140
141 /**
142 * Creates threads to replace terminated threads.
143 * Client posting jobs to thread pool may implement liveness check routines and
144 * terminate pool threads in a job hangs. This method allows the client code
145 * to replenish the thread pool after termination of one of worker threads.
146 */
147 public void replenish() {
148 if (!stopped) {
149 synchronized (threads) {
150 for (int i=threads[0]; i<numberOfThreads; i++) {
151 Thread th = new WorkerThread();
152 th.setPriority(priority);
153 th.setName(this.name +"-pool-thread-"+i);
154 th.start();
155 }
156 }
157 }
158 }
159
160 public void stop() throws ConfigurationException {
161 synchronized (threads) {
162 stopped=true;
163 synchronized (jobQueue) {
164 jobQueue.notifyAll();
165 }
166
167 while (threads[0]>0) {
168 try {
169 threads.wait();
170 } catch (InterruptedException e) {
171 throw new ConfigurationException("Stop() interrupted", e);
172 }
173 }
174 }
175 }
176
177 /**
178 * @param job
179 */
180 private void process(Runnable job) {
181 long start=System.currentTimeMillis();
182 try {
183 job.run();
184 } catch (Exception e) {
185 if (exceptionSink==null) {
186 e.printStackTrace();
187 } else {
188 exceptionSink.consume(job, e);
189 }
190 } finally {
191 long now = System.currentTimeMillis();
192 addMeasurement("run", now - start, now);
193 }
194 }
195
196 public int getMaxQueue() {
197 return maxQueue;
198 }
199
200 public void setMaxQueue(int maxQueue) {
201 this.maxQueue = maxQueue;
202 }
203
204 public String getName() {
205 return name;
206 }
207
208 public void setName(String name) {
209 this.name = name;
210 }
211
212 public int getNumberOfThreads() {
213 return numberOfThreads;
214 }
215
216 public void setNumberOfThreads(int numberOfThreads) {
217 this.numberOfThreads = numberOfThreads;
218 }
219
220 public int getPriority() {
221 return priority;
222 }
223
224 public void setPriority(int priority) {
225 this.priority = priority;
226 }
227
228 public void setExceptionSink(ExceptionSink exceptionSink) {
229 this.exceptionSink = exceptionSink;
230 }
231
232 }