001    /*
002     * hammurapi-rules @mesopotamia.version@
003     * Hammurapi rules engine. 
004     * Copyright (C) 2005  Hammurapi Group
005     *
006     * This program is free software; you can redistribute it and/or
007     * modify it under the terms of the GNU Lesser General Public
008     * License as published by the Free Software Foundation; either
009     * version 2 of the License, or (at your option) any later version.
010     *
011     * This program is distributed in the hope that it will be useful,
012     * but WITHOUT ANY WARRANTY; without even the implied warranty of
013     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
014     * Lesser General Public License for more details.
015     *
016     * You should have received a copy of the GNU Lesser General Public
017     * License along with this library; if not, write to the Free Software
018     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
019     *
020     * URL: http://http://www.hammurapi.biz
021     * e-Mail: support@hammurapi.biz
022     */
023    package biz.hammurapi.dispatch;
024    
025    import java.util.Collection;
026    import java.util.LinkedList;
027    
028    import biz.hammurapi.util.Worker;
029    
030    /**
031     * This class puts results to a queue, which is processed by a worker.
032     * An internal thread is used to push jobs to worker or execute jobs if worker is null.
033     * @author Pavel Vlasov
034     * @revision $Revision$
035     */
036    public class QueuingDispatcher extends Dispatcher {
037    
038            private Thread queueProcessor;
039            
040            /**
041             * Constructor
042             * @param targets
043             * @param worker Worker to delegate dispatching jobs to. Can be null.
044             */
045            public QueuingDispatcher(Collection targets, final Worker worker) {
046                    super(targets);
047                    
048                    queueProcessor=new Thread() {
049                            public void run() {
050                                    while (true) {
051                                            Runnable job=null;
052                                            synchronized (queue) {
053                                                    while (!shutdown && queue.isEmpty()) {
054                                                            try {
055                                                                    queue.wait();
056                                                            } catch (InterruptedException e) {
057                                                                    shutdown=true;
058                                                                    queue.clear();
059                                                                    e.printStackTrace();
060                                                                    return;
061                                                            }
062                                                    }
063                                                    
064                                                    if (shutdown && queue.isEmpty()) {
065                                                            return;
066                                                    }
067                                                    
068                                                    job=(Runnable) queue.removeFirst();
069                                            }
070                                            
071                                            // Try to post job to worker, execute the job if worker is null or doesn't accept the job.
072                                            if (worker==null || !worker.post(job)) {
073                                                    job.run();
074                                            }
075                                    }
076                            }
077                    };
078                    
079                    queueProcessor.setName("[Dispatch Queue Processor] "+queueProcessor.getName());
080                    queueProcessor.start();
081            }
082    
083            protected LinkedList queue=new LinkedList();
084            private int[] jobCounter={0};
085            
086            /**
087             * Allows subclasses to post non-dispatching jobs to the queue.
088             * @param job
089             */
090            protected void postJobToQueue(final Runnable foreignJob) {
091                    synchronized (queue) {                  
092                            Runnable job=new Runnable() {
093                                    
094                                    // Increment job counter.
095                                    {
096                                            synchronized (jobCounter) {
097                                                    ++jobCounter[0];
098                                            }
099    
100                                    }
101                                    
102                                    /**
103                                     * dispatch result and decrement job counter.
104                                     */
105                                    public void run() {
106                                            try {
107                                                    foreignJob.run();
108                                            } finally {
109                                                    synchronized (jobCounter) {
110                                                            --jobCounter[0];
111                                                            if (jobCounter[0]==0) {
112                                                                    jobCounter.notifyAll();
113                                                            }
114                                                    }
115                                            }
116                                    }
117                                    
118                                    public String toString() {
119                                            return "[foreign job wrapper] "+foreignJob;
120                                    }
121                            };
122                            
123                            // Add job to queue and notify posting thread.
124                            queue.add(job);
125                            queue.notifyAll();
126                    }               
127            }
128            
129            protected class DispatchJob implements Runnable {
130                    private Object payload;
131                    
132                    public Object getPayload() {
133                            return payload;
134                    }
135                    
136                    public DispatchJob(Object payload) {
137                            this.payload=payload;
138                            
139                            synchronized (jobCounter) {
140                                    ++jobCounter[0];
141                            }
142                    }
143                    
144                    public void run() {
145                            try {
146                                    QueuingDispatcher.super.dispatch(payload);
147                            } finally {
148                                    done();
149                            }
150                    }
151    
152                    public void done() {
153                            synchronized (jobCounter) {
154                                    --jobCounter[0];
155                                    if (jobCounter[0]==0) {
156                                            jobCounter.notifyAll();
157                                    }
158                            }
159                    }
160                    
161                    public String toString() {
162                            return "[dispatch job] "+payload;
163                    }
164                    
165            }
166            
167            /**
168             * Puts argument to dispatching queue.
169             * The queue is processed by an internal thread, which posts dispatching jobs to worker or
170             * processes them itself if worker is null.
171             */
172            public void dispatch(final Object arg) {
173                    if (arg!=null) {
174                            if (shutdown) {
175                                    throw new IllegalStateException("Dispatcher is shut down");
176                            }
177                            synchronized (queue) {                  
178                                    // Add job to queue and notify posting thread.
179                                    queue.add(new DispatchJob(arg));
180                                    queue.notifyAll();
181                            }
182                    }
183            }
184            
185            /**
186             * Blocks until all jobs are processed.
187             * @throws InterruptedException 
188             */
189            public void join() throws InterruptedException {
190                    synchronized (jobCounter) {
191                            while (jobCounter[0]!=0) {
192                                    jobCounter.wait();
193                            }
194                    }
195            }
196            
197            private boolean shutdown=false;
198            
199            /**
200             * Processes all jobs and stops queue processing thread.
201             * @throws InterruptedException 
202             *
203             */
204            public void stop() throws InterruptedException {
205                    shutdown=true;
206                    //join();
207                    
208                    // Wakes up processing thread if queue is empty. 
209                    synchronized (queue) {
210                            if (queue.isEmpty()) {
211                                    queue.notifyAll();
212                            }
213                    }
214                    queueProcessor.join();
215    
216            }
217    }