001 /* 002 * hammurapi-rules @mesopotamia.version@ 003 * Hammurapi rules engine. 004 * Copyright (C) 2005 Hammurapi Group 005 * 006 * This program is free software; you can redistribute it and/or 007 * modify it under the terms of the GNU Lesser General Public 008 * License as published by the Free Software Foundation; either 009 * version 2 of the License, or (at your option) any later version. 010 * 011 * This program is distributed in the hope that it will be useful, 012 * but WITHOUT ANY WARRANTY; without even the implied warranty of 013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 014 * Lesser General Public License for more details. 015 * 016 * You should have received a copy of the GNU Lesser General Public 017 * License along with this library; if not, write to the Free Software 018 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 019 * 020 * URL: http://http://www.hammurapi.biz 021 * e-Mail: support@hammurapi.biz 022 */ 023 package biz.hammurapi.dispatch; 024 025 import java.util.Collection; 026 import java.util.LinkedList; 027 028 import biz.hammurapi.util.Worker; 029 030 /** 031 * This class puts results to a queue, which is processed by a worker. 032 * An internal thread is used to push jobs to worker or execute jobs if worker is null. 033 * @author Pavel Vlasov 034 * @revision $Revision$ 035 */ 036 public class QueuingDispatcher extends Dispatcher { 037 038 private Thread queueProcessor; 039 040 /** 041 * Constructor 042 * @param targets 043 * @param worker Worker to delegate dispatching jobs to. Can be null. 044 */ 045 public QueuingDispatcher(Collection targets, final Worker worker) { 046 super(targets); 047 048 queueProcessor=new Thread() { 049 public void run() { 050 while (true) { 051 Runnable job=null; 052 synchronized (queue) { 053 while (!shutdown && queue.isEmpty()) { 054 try { 055 queue.wait(); 056 } catch (InterruptedException e) { 057 shutdown=true; 058 queue.clear(); 059 e.printStackTrace(); 060 return; 061 } 062 } 063 064 if (shutdown && queue.isEmpty()) { 065 return; 066 } 067 068 job=(Runnable) queue.removeFirst(); 069 } 070 071 // Try to post job to worker, execute the job if worker is null or doesn't accept the job. 072 if (worker==null || !worker.post(job)) { 073 job.run(); 074 } 075 } 076 } 077 }; 078 079 queueProcessor.setName("[Dispatch Queue Processor] "+queueProcessor.getName()); 080 queueProcessor.start(); 081 } 082 083 protected LinkedList queue=new LinkedList(); 084 private int[] jobCounter={0}; 085 086 /** 087 * Allows subclasses to post non-dispatching jobs to the queue. 088 * @param job 089 */ 090 protected void postJobToQueue(final Runnable foreignJob) { 091 synchronized (queue) { 092 Runnable job=new Runnable() { 093 094 // Increment job counter. 095 { 096 synchronized (jobCounter) { 097 ++jobCounter[0]; 098 } 099 100 } 101 102 /** 103 * dispatch result and decrement job counter. 104 */ 105 public void run() { 106 try { 107 foreignJob.run(); 108 } finally { 109 synchronized (jobCounter) { 110 --jobCounter[0]; 111 if (jobCounter[0]==0) { 112 jobCounter.notifyAll(); 113 } 114 } 115 } 116 } 117 118 public String toString() { 119 return "[foreign job wrapper] "+foreignJob; 120 } 121 }; 122 123 // Add job to queue and notify posting thread. 124 queue.add(job); 125 queue.notifyAll(); 126 } 127 } 128 129 protected class DispatchJob implements Runnable { 130 private Object payload; 131 132 public Object getPayload() { 133 return payload; 134 } 135 136 public DispatchJob(Object payload) { 137 this.payload=payload; 138 139 synchronized (jobCounter) { 140 ++jobCounter[0]; 141 } 142 } 143 144 public void run() { 145 try { 146 QueuingDispatcher.super.dispatch(payload); 147 } finally { 148 done(); 149 } 150 } 151 152 public void done() { 153 synchronized (jobCounter) { 154 --jobCounter[0]; 155 if (jobCounter[0]==0) { 156 jobCounter.notifyAll(); 157 } 158 } 159 } 160 161 public String toString() { 162 return "[dispatch job] "+payload; 163 } 164 165 } 166 167 /** 168 * Puts argument to dispatching queue. 169 * The queue is processed by an internal thread, which posts dispatching jobs to worker or 170 * processes them itself if worker is null. 171 */ 172 public void dispatch(final Object arg) { 173 if (arg!=null) { 174 if (shutdown) { 175 throw new IllegalStateException("Dispatcher is shut down"); 176 } 177 synchronized (queue) { 178 // Add job to queue and notify posting thread. 179 queue.add(new DispatchJob(arg)); 180 queue.notifyAll(); 181 } 182 } 183 } 184 185 /** 186 * Blocks until all jobs are processed. 187 * @throws InterruptedException 188 */ 189 public void join() throws InterruptedException { 190 synchronized (jobCounter) { 191 while (jobCounter[0]!=0) { 192 jobCounter.wait(); 193 } 194 } 195 } 196 197 private boolean shutdown=false; 198 199 /** 200 * Processes all jobs and stops queue processing thread. 201 * @throws InterruptedException 202 * 203 */ 204 public void stop() throws InterruptedException { 205 shutdown=true; 206 //join(); 207 208 // Wakes up processing thread if queue is empty. 209 synchronized (queue) { 210 if (queue.isEmpty()) { 211 queue.notifyAll(); 212 } 213 } 214 queueProcessor.join(); 215 216 } 217 }