001 package biz.hammurapi.dataflow; 002 003 import java.util.ArrayList; 004 import java.util.Collection; 005 import java.util.Iterator; 006 import java.util.LinkedHashMap; 007 import java.util.List; 008 009 /** 010 * Base class for mixers which join data. 011 * @author Pavel 012 */ 013 public abstract class AbstractMixer implements DataSource { 014 015 /** 016 * Joins inputs. 017 * @author Pavel 018 * 019 */ 020 public interface JoinHelper { 021 022 void join(InputEntry[] inputs, String activatorName, DataSink sink); 023 024 } 025 026 /** 027 * Decides whether given inputs shall be joined. 028 * @author Pavel 029 */ 030 public interface JoinPredicate { 031 032 boolean toBeJoined(InputEntry[] inputs, String activatorName); 033 034 } 035 036 protected DataSink sink; 037 038 public void setSink(DataSink sink) { 039 this.sink = sink; 040 } 041 042 // Keeps inputs preserving addition order. 043 private LinkedHashMap inputs = new LinkedHashMap(); 044 045 private class MixerInput implements DataSink { 046 047 String name; 048 049 Collection data; 050 051 public MixerInput(String name) { 052 this.name = name; 053 data = createDataCollection(name); 054 } 055 056 public boolean addData(Data data) { 057 return AbstractMixer.this.addData(name, data); 058 } 059 060 public boolean controlsMultiplexing() { 061 return AbstractMixer.this.controlsMultiplexing(name); 062 } 063 064 public DataItemInfo[] getDataInfo() { 065 return AbstractMixer.this.getDataInfo(name); 066 } 067 068 } 069 070 /** 071 * Information about input presented to join(). 072 * @author Pavel 073 * 074 */ 075 protected interface InputEntry { 076 077 /** 078 * @return Input name. 079 */ 080 String getName(); 081 082 /** 083 * @return Input data. 084 */ 085 Data getData(); 086 087 /** 088 * Join handler invokes this method to indicate that given 089 * data item shall not be used in any further joins, i.e. it's been 090 * "consumed". 091 */ 092 void removeFromJoin(); 093 094 } 095 096 /** 097 * Abstract mixer presents all permutations of newly arrived data from given input 098 * with previously arrived data from other inputs plus null values to include cases 099 * with inputs without data. 100 * @param inputs Inputs for joining 101 * @param activatorName name of the input which activated this join (newly arrived data) 102 * @param sink DataSink to write results of joining to (if any). 103 */ 104 protected abstract void join(InputEntry[] inputs, String activatorName, DataSink sink); 105 106 /** 107 * @param name 108 * @return Named input. Inputs are created on request. 109 */ 110 public DataSink getInput(String name) { 111 DataSink ret = (DataSink) inputs.get(name); 112 if (ret==null) { 113 ret = new MixerInput(name); 114 inputs.put(name, ret); 115 } 116 117 return ret; 118 } 119 120 /** 121 * @param name Input name. 122 * @return multiplexing control flag for given named input. 123 */ 124 protected boolean controlsMultiplexing(String name) { 125 return false; 126 } 127 128 /** 129 * @param name Input name. 130 * @return Metadata for given named input. 131 */ 132 protected DataItemInfo[] getDataInfo(String name) { 133 return null; 134 } 135 136 /** 137 * Creates collection to hold input data for joining. 138 * This implementation creates ArrayList. Override to create other types 139 * of collections, e.g. Set or persistent collection for long-running, restartable 140 * data flows. 141 * @param name Input name. 142 * @return collection to hold input data for joining. 143 */ 144 protected Collection createDataCollection(String name) { 145 return new ArrayList(); 146 } 147 148 /** 149 * This method invokes join logic. 150 * @param name Input name. 151 * @param data Input data. 152 * @return Always returns false - never consumes input. Override if needed. 153 */ 154 protected boolean addData(String name, Data data) { 155 InputEntry[] inputEntries = new InputEntry[this.inputs.size()]; 156 MixerInput[] mixerInputs = new MixerInput[inputEntries.length]; 157 Iterator it = inputs.values().iterator(); 158 for (int i=0; it.hasNext(); ++i) { 159 mixerInputs[i] = (MixerInput) it.next(); 160 } 161 162 // Start joining data. 163 doJoin(name, data, inputEntries, mixerInputs, 0); 164 165 return false; 166 } 167 168 /** 169 * Iterates over all data items to pass eventually to join(). 170 * Method is synchronized to avoid concurrent modification exceptions. 171 * @param name Activator item name. 172 * @param data Activator data. 173 * @param inputEntries Input entries 174 * @param mixerInputs Mixer inputs. 175 * @param inputIdx Index of currently processed input. 176 * @return If one of entries was returned from join the method returns index 177 * of that entry. All entries with higher index shall stop processing further 178 * join. 179 */ 180 private synchronized int doJoin( 181 String name, 182 Data data, 183 InputEntry[] inputEntries, 184 MixerInput[] mixerInputs, 185 int inputIdx) { 186 187 class InputEntryImpl implements InputEntry { 188 189 String name; 190 Data data; 191 boolean removedFromJoin; 192 193 InputEntryImpl(String name, Data data) { 194 this.data = data; 195 this.name = name; 196 } 197 198 void setData(Data data) { 199 this.data = data; 200 } 201 202 boolean isRemovedFromJoin() { 203 return removedFromJoin; 204 } 205 206 public Data getData() { 207 return data; 208 } 209 210 public String getName() { 211 return name; 212 } 213 214 public void removeFromJoin() { 215 removedFromJoin = true; 216 } 217 218 void resetRemoveFromJoin() { 219 removedFromJoin = false; 220 } 221 222 } 223 224 if (inputIdx == inputEntries.length) { 225 join(inputEntries, name, sink); 226 for (int i=0; i<inputEntries.length; ++i) { 227 if (inputEntries[i] instanceof InputEntryImpl && ((InputEntryImpl) inputEntries[i]).isRemovedFromJoin()) { 228 return i; 229 } 230 } 231 } else { 232 if (mixerInputs[inputIdx].name.equals(name)) { 233 InputEntryImpl currentEntry = new InputEntryImpl(name, data); 234 inputEntries[inputIdx] = currentEntry; 235 int result = doJoin(name, data, inputEntries, mixerInputs, inputIdx+1); 236 if (!currentEntry.isRemovedFromJoin()) { 237 mixerInputs[inputIdx].data.add(data); 238 } 239 return result; 240 } else { 241 inputEntries[inputIdx] = null; 242 int result = doJoin(name, data, inputEntries, mixerInputs, inputIdx+1); // Join with null first. 243 if (result<inputIdx) { 244 return result; 245 } 246 InputEntryImpl currentEntry = new InputEntryImpl(name, null); 247 inputEntries[inputIdx] = currentEntry; 248 Iterator it = mixerInputs[inputIdx].data.iterator(); 249 while (it.hasNext()) { 250 currentEntry.setData((Data) it.next()); 251 result = doJoin(name, data, inputEntries, mixerInputs, inputIdx+1); 252 if (currentEntry.isRemovedFromJoin()) { 253 it.remove(); 254 currentEntry.resetRemoveFromJoin(); 255 } 256 if (result<inputIdx) { 257 return result; 258 } 259 } 260 } 261 } 262 return inputEntries.length; 263 } 264 265 /** 266 * Factory method for data. This implementation creates MapData. Override 267 * to create different type of Data, e.g. persistent data backed by relational 268 * database. 269 * @param chain Chain of data to delegate data item requests. Can be null. 270 * @return 271 */ 272 protected Data createData(Data[] chain) { 273 return chain==null ? new MapData() : new MapData(chain); 274 } 275 276 /** 277 * Creates output data by mounting non-null inputs using their names plus separator 278 * as prefixes. 279 * @param inputs 280 * @return 281 */ 282 protected Data mountInputs(InputEntry[] inputs, String separator) { 283 Data ret = createData(null); 284 for (int i=0; i<inputs.length; ++i) { 285 if (inputs[i]!=null) { 286 Data iData = inputs[i].getData(); 287 if (iData!=null) { 288 ret.mount(inputs[i].getName()+separator, iData); 289 } 290 } 291 } 292 return ret; 293 } 294 295 /** 296 * Creates output data by chaining non-null inputs. 297 * @param inputs 298 * @return 299 */ 300 protected Data chainInputs(InputEntry[] inputs) { 301 List chain = new ArrayList(); 302 for (int i=0; i<inputs.length; ++i) { 303 if (inputs[i]!=null) { 304 Data iData = inputs[i].getData(); 305 if (iData!=null) { 306 chain.add(iData); 307 } 308 } 309 } 310 if (chain.isEmpty()) { 311 return createData(null); 312 } 313 return createData((Data[]) chain.toArray(new Data[chain.size()])); 314 } 315 }