001    package biz.hammurapi.rules.backwardreasoning;
002    
003    import java.util.ArrayList;
004    import java.util.Collection;
005    import java.util.Iterator;
006    import java.util.NoSuchElementException;
007    import java.util.logging.Logger;
008    
009    /**
010     * Accumulates objects. Allows multiple producers concurrently
011     * add objects and multiple consumers iterate over objects.
012     * Iterators block until next item is available or all producers
013     * are done with producing objects.
014     * 
015     * Addition of nulls is not allowed.
016     * 
017     * In some situations last value returned by iterator can be null.
018     * Consumers shall check value returned by iterator for null and discard it.
019     * 
020     * @author Pavel
021     *
022     */
023    public class Accumulator {
024            
025            private static final Logger logger = Logger.getLogger(Accumulator.class.getName());
026            
027            private Collection producers = new ArrayList();
028    
029            private Class type;
030            
031            /**
032             * Only elements of specified type can be added to accumulator.
033             * @param type
034             */
035            public Accumulator(Class type) {
036                    this.type = type;
037            }
038    
039            /**
040             * Indicates that one more produces was added to the accumulator.
041             */
042            public synchronized void addProducer(Object producer) {
043                    producers.add(producer);
044                    logger.fine("Producer "+producer+" added to "+this);
045            }
046            
047            public String toString() {
048                    return getClass().getName()+" of "+type+", producers: "+producers;
049            }
050            
051            protected interface Link {
052                    
053                    Object getValue();
054                    
055                    Link peekNext();
056                    
057                    void setNext(Link next);
058                    
059                    Link getNext() throws InterruptedException;
060            }
061            
062            protected class LinkImpl implements Link {
063                    
064                    public LinkImpl(Object value) {
065                            this.value = value;
066                    }
067                    
068                    Object value;           
069                    Link next;
070                    
071                    /**
072                     * Sets next link and notifies waiting iterators.
073                     * @param value
074                     */
075                    public synchronized void setNext(Link next) {
076                            this.next = next;
077                            notifyAll();                    
078                    }
079                    
080                    /**
081                     * Retrieves next link. Block until next link is available. 
082                     * @return
083                     * @throws InterruptedException
084                     */
085                    public synchronized Link getNext() throws InterruptedException {
086                            while (next==null) {
087                                    onBlock();
088                                    if (next==null) {
089                                            wait();
090                                    }
091                            }
092                            return next;
093                    }
094    
095                    public Object getValue() {
096                            return value;
097                    }
098    
099                    public Link peekNext() {
100                            return next;
101                    }
102                    
103            }
104            
105            protected final Link END = new Link() {
106    
107                    public Link getNext() throws InterruptedException {
108                            throw new UnsupportedOperationException();
109                    }
110    
111                    public Object getValue() {
112                            return null;
113                    }
114    
115                    public Link peekNext() {
116                            throw new UnsupportedOperationException();
117                    }
118    
119                    public void setNext(Link next) {
120                            throw new UnsupportedOperationException();
121                    }
122                    
123            };
124            
125            /**
126             * This link 
127             */
128            protected final LinkImpl first = new LinkImpl(null);
129            
130            protected Link current = first;
131                    
132            /**
133             * Returned iterator iterates over available elements and blocks waiting for
134             * more elements from producers or for all producers to finish. remove() is
135             * not supported by returned iterators.
136             * @return Iterator.
137             */
138            public Iterator blockingIterator() {
139                    return new Iterator() {
140                            
141                            LinkImpl link = first;
142    
143                            public boolean hasNext() {
144                                    return link.peekNext() != END;
145                            }
146    
147                            public Object next() {
148                                    if (!hasNext()) {
149                                            throw new NoSuchElementException();                                     
150                                    }
151                                    
152                                    try {                           
153                                            Link nxt = link.getNext();
154                                            if (nxt == END) {
155                                                    return null;
156                                            }
157                                            link = (LinkImpl) nxt;
158                                            return link.getValue();
159                                    } catch (InterruptedException e) {
160                                            throw new NoSuchElementException("Waiting for the next element was interrupted: "+e);                                   
161                                    }                               
162                            }
163    
164                            public void remove() {
165                                    throw new UnsupportedOperationException("remove() is not supported");                           
166                            }
167                            
168                    };
169            }
170            
171            public interface AccumulatorIterator extends Iterator {
172                    
173                    /**
174                     * @return true if no more items are going to be added to the accumulator.
175                     */
176                    boolean isClosed();
177            }
178    
179            /**
180             * Returned iterator iterates over available elements. It does not block waiting for
181             * more elements from producers or for all producers to finish.
182             * When this iterator reaches the end of elements chain it returns false from hasNext().
183             * If later elements are added, it will return true from hasNext() and can be used
184             * again to navigate further. 
185             * remove() is not supported by returned iterators.
186             * @return Iterator.
187             */
188            public AccumulatorIterator iterator() {
189                    return new AccumulatorIterator() {
190                            
191                            LinkImpl link = first;
192    
193                            public boolean hasNext() {
194                                    return link.peekNext()!=null && link.peekNext()!=END;
195                            }
196    
197                            public Object next() {
198                                    if (!hasNext()) {
199                                            throw new NoSuchElementException();                                     
200                                    }
201                                    
202                                    try {
203                                            link = (LinkImpl) link.getNext();
204                                            return link.getValue();
205                                    } catch (InterruptedException e) {
206                                            throw new NoSuchElementException("Waiting for the next element was interrupted: "+e);                                   
207                                    }                               
208                            }
209    
210                            public void remove() {
211                                    throw new UnsupportedOperationException("remove() is not supported");                           
212                            }
213    
214                            public boolean isClosed() {
215                                    return current.peekNext()==END;
216                            }
217                            
218                    };
219            }
220            
221            /**
222             * Adds object, unblocks waiting iterators.
223             * @param obj
224             */
225            public synchronized void add(Object obj) {
226                    if (obj == null) {
227                            throw new NullPointerException(this.getClass().getName()+" does not support null elements.");
228                    }
229                    if (current.peekNext()==END) {
230                            throw new IllegalStateException("Accumulator is closed, no more items can be added.");
231                    }
232                    if (!type.isInstance(obj)) {
233                            throw new ClassCastException("Expected "+type.getName()+", got "+obj.getClass().getName());
234                    }
235                    logger.finest("----> Element '"+obj+"' added to "+this);
236                    Link newLink = new LinkImpl(obj);
237                    newLink.setNext(current.peekNext());
238                    current.setNext(newLink);
239                    current = newLink;              
240            }
241            
242            /**
243             * Invocation of this method indicates that no more elements will be added to
244             * accumulator.
245             */
246            public synchronized void close(Object producer) {
247                    logger.fine("Producer "+producer+ " closes "+this);             
248                    if (!producers.contains(producer)) {
249                            logger.warning("Producer "+producer+" is not registered with "+this);
250                    }
251                    producers.remove(producer);
252                    if (producers.isEmpty()) {
253                            current.setNext(END);
254                            logger.fine("Closed "+this);
255                    }
256            }
257            
258            /**
259             * This method is invoked before iterator enters blocked state.
260             * Subclasses can override this to provide unblocking logic. 
261             * @param it
262             */
263            protected void onBlock() {
264                    logger.finest("Entering blocked state: "+this);
265            }
266    }