ThreadPool.java

biz/hammurapi/util/ThreadPool.java

Violations

Inspector Message Severity Location
Java Inspector 048 Copyrights information should be present in each file. 1
Java Inspector 083 Do not use printStackTrace() for exception logging. 2 120:90
Java Inspector 083 Do not use printStackTrace() for exception logging. 2 206:50
Java Inspector 089 Undocumented constructor 2 45:9
Java Inspector 089 Constructor is not properly documented 2 56:9
Java Inspector 089 Parameter exceptionSink is not documented 2 56:9
Java Inspector 089 Undocumented constructor 2 64:9
Java Inspector 089 Undocumented method 2 75:9
Java Inspector 089 Undocumented method 2 105:17
Java Inspector 089 Undocumented method 2 152:9
Java Inspector 089 Undocumented method 2 180:9
Java Inspector 089 Undocumented method 2 222:9
Java Inspector 089 Undocumented method 2 226:9
Java Inspector 089 Undocumented method 2 230:9
Java Inspector 089 Undocumented method 2 234:9
Java Inspector 089 Undocumented method 2 238:9
Java Inspector 089 Undocumented method 2 242:9
Java Inspector 089 Undocumented method 2 246:9
Java Inspector 089 Undocumented method 2 250:9
Java Inspector 089 Undocumented method 2 254:9
Java Inspector 025 Avoid hardwired numeric literals. Allowed literals: [1, -1, 0] 3 39:37
Java Inspector 025 Avoid hardwired numeric literals. Allowed literals: [1, -1, 0] 3 42:30
Java Inspector 026 Avoid hardwired string literals. Allowed literals: [] 3 89:48
Java Inspector 026 Avoid hardwired string literals. Allowed literals: [] 3 90:48
Java Inspector 026 Avoid hardwired string literals. Allowed literals: [] 3 95:32
Java Inspector 026 Avoid hardwired string literals. Allowed literals: [] 3 96:32
Java Inspector 026 Avoid hardwired string literals. Allowed literals: [] 3 137:56
Java Inspector 026 Avoid hardwired string literals. Allowed literals: [] 3 156:47
Java Inspector 026 Avoid hardwired string literals. Allowed literals: [] 3 173:63
Java Inspector 026 Avoid hardwired string literals. Allowed literals: [] 3 191:74
Java Inspector 026 Avoid hardwired string literals. Allowed literals: [] 3 213:40
Java Inspector 026 Avoid hardwired string literals. Allowed literals: [] 3 217:76
Java Inspector 042 Do not use 'notify ()'; use 'notifyAll ()' instead 3 87:48
Java Inspector 051 It is good practice to call in any case super() in a constructor. 3 45:9
Java Inspector 051 It is good practice to call in any case super() in a constructor. 3 64:9

Source code

1/*
2 * hgcommons 9
3 * Hammurapi Group Common Library
4 * Copyright (C) 2003 Hammurapi Group
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 *
20 * URL: http://www.hammurapi.biz/hammurapi-biz/ef/xmenu/hammurapi-group/products/products/hgcommons/index.html
21 * e-Mail: support@hammurapi.biz
22 */
23package biz.hammurapi.util;
24
25import java.util.LinkedList;
26
27import biz.hammurapi.config.ComponentBase;
28import biz.hammurapi.config.ConfigurationException;
29import biz.hammurapi.metrics.MeasurementConsumer;
30
31
32/**
33 * Distributes work among multiple threads.
34 * @author Pavel Vlasov
35 * @revision $Revision$
36 */
37public class ThreadPool extends ComponentBase implements Worker {
38
39 private int numberOfThreads=10;
40 private int priority=Thread.NORM_PRIORITY;
41 private ExceptionSink exceptionSink;
42 private int maxQueue=10;
43 private boolean stopped;
44
45 public ThreadPool() {
46 // Default constructor
47 }
48
49 /**
50 * @param numberOfThreads Number of threads to create.
51 * @param priority Threads priority.
52 * @param maxQueue Maximum number of jobs in execution queue. When
53 * execution queue reaches its maximum post() processes job in the invoking thread. Values <1 mean no limit.
54 * @param exceptionSink
55 */
56 public ThreadPool(int numberOfThreads, int priority, int maxQueue, ExceptionSink exceptionSink) {
57 super();
58 this.numberOfThreads=numberOfThreads;
59 this.priority=priority;
60 this.exceptionSink=exceptionSink;
61 this.maxQueue=maxQueue;
62 }
63
64 public ThreadPool(int numberOfThreads, int priority, int maxQueue, ExceptionSink exceptionSink, String name) {
65 this(numberOfThreads, priority, maxQueue, exceptionSink);
66 if (name!=null) {
67 this.name=name;
68 }
69 }
70
71 private LinkedList jobQueue=new LinkedList();
72 private int[] threads={0};
73 private String name = toString();
74
75 public boolean post(Runnable job) {
76 int queueSize;
77 synchronized (jobQueue) {
78 if (stopped) {
79 return false;
80 }
81
82 queueSize=jobQueue.size();
83
84 if (maxQueue==0 || queueSize<maxQueue) {
85 // Add job to processing queue
86 jobQueue.add(job);
87 jobQueue.notify(); // wake up one thread.
88
89 addMeasurement("queue", queueSize, 0);
90 addMeasurement("post", 1, 0);
91 return true;
92 }
93 }
94
95 addMeasurement("queue", queueSize, 0);
96 addMeasurement("post", 1, 0);
97
98 // process in current thread.
99 process(job);
100 return true;
101 }
102
103 private class WorkerThread extends Thread {
104
105 public void run() {
106 synchronized (threads) {
107 threads[0]++;
108 }
109
110 try {
111 while (true) {
112 Runnable job;
113 int queueSize;
114 synchronized (jobQueue) {
115 while (!stopped && jobQueue.isEmpty()) {
116 try {
117 jobQueue.wait();
118 } catch (InterruptedException e) {
119 if (exceptionSink==null) {
120 e.printStackTrace();
121 } else {
122 exceptionSink.consume(this, e);
123 }
124 return;
125 }
126 }
127
128 if (stopped && jobQueue.isEmpty()) {
129 return;
130 }
131
132 job=(Runnable) jobQueue.removeFirst();
133
134 queueSize=jobQueue.size();
135 }
136
137 addMeasurement("queue", queueSize, 0);
138
139 process(job);
140 }
141 } finally {
142 synchronized (threads) {
143 threads[0]--;
144 if (threads[0]<=0) {
145 threads.notifyAll();
146 }
147 }
148 }
149 }
150 }
151
152 public void start() throws ConfigurationException {
153 for (int i=0; i<numberOfThreads; i++) {
154 Thread th = new WorkerThread();
155 th.setPriority(priority);
156 th.setName(this.name +"-pool-thread-"+i);
157 th.start();
158 }
159 }
160
161 /**
162 * Creates threads to replace terminated threads.
163 * Client posting jobs to thread pool may implement liveness check routines and
164 * terminate pool threads in a job hangs. This method allows the client code
165 * to replenish the thread pool after termination of one of worker threads.
166 */
167 public void replenish() {
168 if (!stopped) {
169 synchronized (threads) {
170 for (int i=threads[0]; i<numberOfThreads; i++) {
171 Thread th = new WorkerThread();
172 th.setPriority(priority);
173 th.setName(this.name +"-pool-thread-"+i);
174 th.start();
175 }
176 }
177 }
178 }
179
180 public void stop() throws ConfigurationException {
181 synchronized (threads) {
182 stopped=true;
183 synchronized (jobQueue) {
184 jobQueue.notifyAll();
185 }
186
187 while (threads[0]>0) {
188 try {
189 threads.wait();
190 } catch (InterruptedException e) {
191 throw new ConfigurationException("Stop() interrupted", e);
192 }
193 }
194 }
195 }
196
197 /**
198 * @param job
199 */
200 private void process(Runnable job) {
201 long start=System.currentTimeMillis();
202 try {
203 job.run();
204 } catch (Exception e) {
205 if (exceptionSink==null) {
206 e.printStackTrace();
207 } else {
208 exceptionSink.consume(job, e);
209 }
210 } finally {
211 long now = System.currentTimeMillis();
212 long duration = now - start;
213 addMeasurement("run", duration, now);
214
215 // Jobs can collect metrics if they implement MeasurementConsumer.
216 if (job instanceof MeasurementConsumer) {
217 ((MeasurementConsumer) job).addMeasurement("run", duration, now);
218 }
219 }
220 }
221
222 public int getMaxQueue() {
223 return maxQueue;
224 }
225
226 public void setMaxQueue(int maxQueue) {
227 this.maxQueue = maxQueue;
228 }
229
230 public String getName() {
231 return name;
232 }
233
234 public void setName(String name) {
235 this.name = name;
236 }
237
238 public int getNumberOfThreads() {
239 return numberOfThreads;
240 }
241
242 public void setNumberOfThreads(int numberOfThreads) {
243 this.numberOfThreads = numberOfThreads;
244 }
245
246 public int getPriority() {
247 return priority;
248 }
249
250 public void setPriority(int priority) {
251 this.priority = priority;
252 }
253
254 public void setExceptionSink(ExceptionSink exceptionSink) {
255 this.exceptionSink = exceptionSink;
256 }
257
258}
259