001 package biz.hammurapi.rules.backwardreasoning; 002 003 import java.util.ArrayList; 004 import java.util.Collection; 005 import java.util.Iterator; 006 import java.util.NoSuchElementException; 007 import java.util.logging.Logger; 008 009 /** 010 * Accumulates objects. Allows multiple producers concurrently 011 * add objects and multiple consumers iterate over objects. 012 * Iterators block until next item is available or all producers 013 * are done with producing objects. 014 * 015 * Addition of nulls is not allowed. 016 * 017 * In some situations last value returned by iterator can be null. 018 * Consumers shall check value returned by iterator for null and discard it. 019 * 020 * @author Pavel 021 * 022 */ 023 public class Accumulator { 024 025 private static final Logger logger = Logger.getLogger(Accumulator.class.getName()); 026 027 private Collection producers = new ArrayList(); 028 029 private Class type; 030 031 /** 032 * Only elements of specified type can be added to accumulator. 033 * @param type 034 */ 035 public Accumulator(Class type) { 036 this.type = type; 037 } 038 039 /** 040 * Indicates that one more produces was added to the accumulator. 041 */ 042 public synchronized void addProducer(Object producer) { 043 producers.add(producer); 044 logger.fine("Producer "+producer+" added to "+this); 045 } 046 047 public String toString() { 048 return getClass().getName()+" of "+type+", producers: "+producers; 049 } 050 051 protected interface Link { 052 053 Object getValue(); 054 055 Link peekNext(); 056 057 void setNext(Link next); 058 059 Link getNext() throws InterruptedException; 060 } 061 062 protected class LinkImpl implements Link { 063 064 public LinkImpl(Object value) { 065 this.value = value; 066 } 067 068 Object value; 069 Link next; 070 071 /** 072 * Sets next link and notifies waiting iterators. 073 * @param value 074 */ 075 public synchronized void setNext(Link next) { 076 this.next = next; 077 notifyAll(); 078 } 079 080 /** 081 * Retrieves next link. Block until next link is available. 082 * @return 083 * @throws InterruptedException 084 */ 085 public synchronized Link getNext() throws InterruptedException { 086 while (next==null) { 087 onBlock(); 088 if (next==null) { 089 wait(); 090 } 091 } 092 return next; 093 } 094 095 public Object getValue() { 096 return value; 097 } 098 099 public Link peekNext() { 100 return next; 101 } 102 103 } 104 105 protected final Link END = new Link() { 106 107 public Link getNext() throws InterruptedException { 108 throw new UnsupportedOperationException(); 109 } 110 111 public Object getValue() { 112 return null; 113 } 114 115 public Link peekNext() { 116 throw new UnsupportedOperationException(); 117 } 118 119 public void setNext(Link next) { 120 throw new UnsupportedOperationException(); 121 } 122 123 }; 124 125 /** 126 * This link 127 */ 128 protected final LinkImpl first = new LinkImpl(null); 129 130 protected Link current = first; 131 132 /** 133 * Returned iterator iterates over available elements and blocks waiting for 134 * more elements from producers or for all producers to finish. remove() is 135 * not supported by returned iterators. 136 * @return Iterator. 137 */ 138 public Iterator blockingIterator() { 139 return new Iterator() { 140 141 LinkImpl link = first; 142 143 public boolean hasNext() { 144 return link.peekNext() != END; 145 } 146 147 public Object next() { 148 if (!hasNext()) { 149 throw new NoSuchElementException(); 150 } 151 152 try { 153 Link nxt = link.getNext(); 154 if (nxt == END) { 155 return null; 156 } 157 link = (LinkImpl) nxt; 158 return link.getValue(); 159 } catch (InterruptedException e) { 160 throw new NoSuchElementException("Waiting for the next element was interrupted: "+e); 161 } 162 } 163 164 public void remove() { 165 throw new UnsupportedOperationException("remove() is not supported"); 166 } 167 168 }; 169 } 170 171 public interface AccumulatorIterator extends Iterator { 172 173 /** 174 * @return true if no more items are going to be added to the accumulator. 175 */ 176 boolean isClosed(); 177 } 178 179 /** 180 * Returned iterator iterates over available elements. It does not block waiting for 181 * more elements from producers or for all producers to finish. 182 * When this iterator reaches the end of elements chain it returns false from hasNext(). 183 * If later elements are added, it will return true from hasNext() and can be used 184 * again to navigate further. 185 * remove() is not supported by returned iterators. 186 * @return Iterator. 187 */ 188 public AccumulatorIterator iterator() { 189 return new AccumulatorIterator() { 190 191 LinkImpl link = first; 192 193 public boolean hasNext() { 194 return link.peekNext()!=null && link.peekNext()!=END; 195 } 196 197 public Object next() { 198 if (!hasNext()) { 199 throw new NoSuchElementException(); 200 } 201 202 try { 203 link = (LinkImpl) link.getNext(); 204 return link.getValue(); 205 } catch (InterruptedException e) { 206 throw new NoSuchElementException("Waiting for the next element was interrupted: "+e); 207 } 208 } 209 210 public void remove() { 211 throw new UnsupportedOperationException("remove() is not supported"); 212 } 213 214 public boolean isClosed() { 215 return current.peekNext()==END; 216 } 217 218 }; 219 } 220 221 /** 222 * Adds object, unblocks waiting iterators. 223 * @param obj 224 */ 225 public synchronized void add(Object obj) { 226 if (obj == null) { 227 throw new NullPointerException(this.getClass().getName()+" does not support null elements."); 228 } 229 if (current.peekNext()==END) { 230 throw new IllegalStateException("Accumulator is closed, no more items can be added."); 231 } 232 if (!type.isInstance(obj)) { 233 throw new ClassCastException("Expected "+type.getName()+", got "+obj.getClass().getName()); 234 } 235 logger.finest("----> Element '"+obj+"' added to "+this); 236 Link newLink = new LinkImpl(obj); 237 newLink.setNext(current.peekNext()); 238 current.setNext(newLink); 239 current = newLink; 240 } 241 242 /** 243 * Invocation of this method indicates that no more elements will be added to 244 * accumulator. 245 */ 246 public synchronized void close(Object producer) { 247 logger.fine("Producer "+producer+ " closes "+this); 248 if (!producers.contains(producer)) { 249 logger.warning("Producer "+producer+" is not registered with "+this); 250 } 251 producers.remove(producer); 252 if (producers.isEmpty()) { 253 current.setNext(END); 254 logger.fine("Closed "+this); 255 } 256 } 257 258 /** 259 * This method is invoked before iterator enters blocked state. 260 * Subclasses can override this to provide unblocking logic. 261 * @param it 262 */ 263 protected void onBlock() { 264 logger.finest("Entering blocked state: "+this); 265 } 266 }