001 /*
002 * hammurapi-rules @mesopotamia.version@
003 * Hammurapi rules engine.
004 * Copyright (C) 2005 Hammurapi Group
005 *
006 * This program is free software; you can redistribute it and/or
007 * modify it under the terms of the GNU Lesser General Public
008 * License as published by the Free Software Foundation; either
009 * version 2 of the License, or (at your option) any later version.
010 *
011 * This program is distributed in the hope that it will be useful,
012 * but WITHOUT ANY WARRANTY; without even the implied warranty of
013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014 * Lesser General Public License for more details.
015 *
016 * You should have received a copy of the GNU Lesser General Public
017 * License along with this library; if not, write to the Free Software
018 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
019 *
020 * URL: http://http://www.hammurapi.biz
021 * e-Mail: support@hammurapi.biz
022 */
023 package biz.hammurapi.dispatch;
024
025 import java.util.Collection;
026 import java.util.LinkedList;
027
028 import biz.hammurapi.util.Worker;
029
030 /**
031 * This class puts results to a queue, which is processed by a worker.
032 * An internal thread is used to push jobs to worker or execute jobs if worker is null.
033 * @author Pavel Vlasov
034 * @revision $Revision$
035 */
036 public class QueuingDispatcher extends Dispatcher {
037
038 private Thread queueProcessor;
039
040 /**
041 * Constructor
042 * @param targets
043 * @param worker Worker to delegate dispatching jobs to. Can be null.
044 */
045 public QueuingDispatcher(Collection targets, final Worker worker) {
046 super(targets);
047
048 queueProcessor=new Thread() {
049 public void run() {
050 while (true) {
051 Runnable job=null;
052 synchronized (queue) {
053 while (!shutdown && queue.isEmpty()) {
054 try {
055 queue.wait();
056 } catch (InterruptedException e) {
057 shutdown=true;
058 queue.clear();
059 e.printStackTrace();
060 return;
061 }
062 }
063
064 if (shutdown && queue.isEmpty()) {
065 return;
066 }
067
068 job=(Runnable) queue.removeFirst();
069 }
070
071 // Try to post job to worker, execute the job if worker is null or doesn't accept the job.
072 if (worker==null || !worker.post(job)) {
073 job.run();
074 }
075 }
076 }
077 };
078
079 queueProcessor.setName("[Dispatch Queue Processor] "+queueProcessor.getName());
080 queueProcessor.start();
081 }
082
083 protected LinkedList queue=new LinkedList();
084 private int[] jobCounter={0};
085
086 /**
087 * Allows subclasses to post non-dispatching jobs to the queue.
088 * @param job
089 */
090 protected void postJobToQueue(final Runnable foreignJob) {
091 synchronized (queue) {
092 Runnable job=new Runnable() {
093
094 // Increment job counter.
095 {
096 synchronized (jobCounter) {
097 ++jobCounter[0];
098 }
099
100 }
101
102 /**
103 * dispatch result and decrement job counter.
104 */
105 public void run() {
106 try {
107 foreignJob.run();
108 } finally {
109 synchronized (jobCounter) {
110 --jobCounter[0];
111 if (jobCounter[0]==0) {
112 jobCounter.notifyAll();
113 }
114 }
115 }
116 }
117
118 public String toString() {
119 return "[foreign job wrapper] "+foreignJob;
120 }
121 };
122
123 // Add job to queue and notify posting thread.
124 queue.add(job);
125 queue.notifyAll();
126 }
127 }
128
129 protected class DispatchJob implements Runnable {
130 private Object payload;
131
132 public Object getPayload() {
133 return payload;
134 }
135
136 public DispatchJob(Object payload) {
137 this.payload=payload;
138
139 synchronized (jobCounter) {
140 ++jobCounter[0];
141 }
142 }
143
144 public void run() {
145 try {
146 QueuingDispatcher.super.dispatch(payload);
147 } finally {
148 done();
149 }
150 }
151
152 public void done() {
153 synchronized (jobCounter) {
154 --jobCounter[0];
155 if (jobCounter[0]==0) {
156 jobCounter.notifyAll();
157 }
158 }
159 }
160
161 public String toString() {
162 return "[dispatch job] "+payload;
163 }
164
165 }
166
167 /**
168 * Puts argument to dispatching queue.
169 * The queue is processed by an internal thread, which posts dispatching jobs to worker or
170 * processes them itself if worker is null.
171 */
172 public void dispatch(final Object arg) {
173 if (arg!=null) {
174 if (shutdown) {
175 throw new IllegalStateException("Dispatcher is shut down");
176 }
177 synchronized (queue) {
178 // Add job to queue and notify posting thread.
179 queue.add(new DispatchJob(arg));
180 queue.notifyAll();
181 }
182 }
183 }
184
185 /**
186 * Blocks until all jobs are processed.
187 * @throws InterruptedException
188 */
189 public void join() throws InterruptedException {
190 synchronized (jobCounter) {
191 while (jobCounter[0]!=0) {
192 jobCounter.wait();
193 }
194 }
195 }
196
197 private boolean shutdown=false;
198
199 /**
200 * Processes all jobs and stops queue processing thread.
201 * @throws InterruptedException
202 *
203 */
204 public void stop() throws InterruptedException {
205 shutdown=true;
206 //join();
207
208 // Wakes up processing thread if queue is empty.
209 synchronized (queue) {
210 if (queue.isEmpty()) {
211 queue.notifyAll();
212 }
213 }
214 queueProcessor.join();
215
216 }
217 }