001    /*
002    @license.text@
003     */
004    package biz.hammurapi.util;
005    
006    import java.util.LinkedList;
007    
008    import biz.hammurapi.config.ComponentBase;
009    import biz.hammurapi.config.ConfigurationException;
010    
011    
012    /**
013     * Distributes work among multiple threads.
014     * @author Pavel Vlasov
015     * @revision $Revision$
016     */
017    public class ThreadPool extends ComponentBase implements Worker {
018            
019            private int numberOfThreads=10;
020            private int priority=Thread.NORM_PRIORITY; 
021            private ExceptionSink exceptionSink;
022            private int maxQueue=10;
023            private boolean stopped;
024            
025            public ThreadPool() {
026                    // Default constructor
027            }
028            
029            /**
030             * @param numberOfThreads Number of threads to create.
031             * @param priority Threads priority.
032             * @param maxQueue Maximum number of jobs in execution queue. When 
033             * execution queue reaches its maximum post() processes job in the invoking thread. Values <1 mean no limit.
034             * @param exceptionSink
035             */
036            public ThreadPool(int numberOfThreads, int priority, int maxQueue, ExceptionSink exceptionSink) {
037                    super();
038                    this.numberOfThreads=numberOfThreads;
039                    this.priority=priority;
040                    this.exceptionSink=exceptionSink;
041                    this.maxQueue=maxQueue;
042            }
043            
044            public ThreadPool(int numberOfThreads, int priority, int maxQueue, ExceptionSink exceptionSink, String name) {
045                    this(numberOfThreads, priority, maxQueue, exceptionSink);
046                    if (name!=null) {
047                            this.name=name;
048                    }
049            }
050    
051            private LinkedList jobQueue=new LinkedList();
052            private int[] threads={0};
053            private String name = toString();
054            
055            public boolean post(Runnable job) {
056                    int queueSize;
057                    synchronized (jobQueue) {
058                            if (stopped) {
059                                    return false;
060                            }
061                            
062                            queueSize=jobQueue.size();
063                            
064                            if (maxQueue==0 || queueSize<maxQueue) {
065                                    // Add job to processing queue
066                                    jobQueue.add(job);                      
067                                    jobQueue.notify(); // wake up one thread.
068                                    
069                                    addMeasurement("queue", queueSize, 0);
070                                    addMeasurement("post", 1, 0);
071                                    return true;
072                            }
073                    }
074                    
075                    addMeasurement("queue", queueSize, 0);
076                    addMeasurement("post", 1, 0);
077                    
078                    // process in current thread.
079                    process(job);
080                    return true;
081            }
082            
083            private class WorkerThread extends Thread {
084                    
085                    public void run() {
086                            synchronized (threads) {
087                                    threads[0]++;
088                            }
089                            
090                            try {
091                                    while (true) {
092                                            Runnable job;
093                                            int queueSize;
094                                            synchronized (jobQueue) {
095                                                    while (!stopped && jobQueue.isEmpty()) {
096                                                            try {
097                                                                    jobQueue.wait();
098                                                            } catch (InterruptedException e) {
099                                                                    if (exceptionSink==null) {
100                                                                            e.printStackTrace();
101                                                                    } else {
102                                                                            exceptionSink.consume(this, e);
103                                                                    }
104                                                                    return;
105                                                            }                                                               
106                                                    }
107                                                    
108                                                    if (stopped && jobQueue.isEmpty()) {
109                                                            return;
110                                                    }
111                                                    
112                                                    job=(Runnable) jobQueue.removeFirst();
113                                                    
114                                                    queueSize=jobQueue.size();
115                                            }       
116                                            
117                                            addMeasurement("queue", queueSize, 0);
118                                            
119                                            process(job);
120                                    }                                                                                       
121                            } finally {
122                                    synchronized (threads) {
123                                            threads[0]--;
124                                            if (threads[0]<=0) {
125                                                    threads.notifyAll();
126                                            }
127                                    }
128                            }
129                    }
130            }
131    
132            public void start() throws ConfigurationException {
133                    for (int i=0; i<numberOfThreads; i++) {
134                            Thread th = new WorkerThread();                 
135                            th.setPriority(priority);
136                            th.setName(this.name +"-pool-thread-"+i);
137                            th.start();
138                    }
139            }
140            
141            /**
142             * Creates threads to replace terminated threads.
143             * Client posting jobs to thread pool may implement liveness check routines and
144             * terminate pool threads in a job hangs. This method allows the client code
145             * to replenish the thread pool after termination of one of worker threads.
146             */
147            public void replenish() {
148                    if (!stopped) {
149                            synchronized (threads) {
150                                    for (int i=threads[0]; i<numberOfThreads; i++) {
151                                            Thread th = new WorkerThread();                 
152                                            th.setPriority(priority);
153                                            th.setName(this.name +"-pool-thread-"+i);
154                                            th.start();
155                                    }                               
156                            }
157                    }
158            }
159    
160            public void stop() throws ConfigurationException {
161                    synchronized (threads) {                        
162                            stopped=true;
163                            synchronized (jobQueue) {
164                                    jobQueue.notifyAll();
165                            }
166                            
167                            while (threads[0]>0) {
168                                    try {
169                                            threads.wait();
170                                    } catch (InterruptedException e) {
171                                            throw new ConfigurationException("Stop() interrupted", e);
172                                    }
173                            }
174                    }
175            }
176    
177            /**
178             * @param job
179             */
180            private void process(Runnable job) {
181                    long start=System.currentTimeMillis();
182                    try {
183                            job.run();
184                    } catch (Exception e) {
185                            if (exceptionSink==null) {
186                                    e.printStackTrace();
187                            } else {
188                                    exceptionSink.consume(job, e);
189                            }
190                    } finally {
191                            long now = System.currentTimeMillis();
192                            addMeasurement("run", now - start, now);
193                    }
194            }
195    
196            public int getMaxQueue() {
197                    return maxQueue;
198            }
199    
200            public void setMaxQueue(int maxQueue) {
201                    this.maxQueue = maxQueue;
202            }
203    
204            public String getName() {
205                    return name;
206            }
207    
208            public void setName(String name) {
209                    this.name = name;
210            }
211    
212            public int getNumberOfThreads() {
213                    return numberOfThreads;
214            }
215    
216            public void setNumberOfThreads(int numberOfThreads) {
217                    this.numberOfThreads = numberOfThreads;
218            }
219    
220            public int getPriority() {
221                    return priority;
222            }
223    
224            public void setPriority(int priority) {
225                    this.priority = priority;
226            }
227            
228            public void setExceptionSink(ExceptionSink exceptionSink) {
229                    this.exceptionSink = exceptionSink;
230            }
231    
232    }