001    package biz.hammurapi.dataflow;
002    
003    import java.util.ArrayList;
004    import java.util.Collection;
005    import java.util.Iterator;
006    import java.util.LinkedHashMap;
007    import java.util.List;
008    
009    /**
010     * Base class for mixers which join data.
011     * @author Pavel
012     */
013    public abstract class AbstractMixer implements DataSource {
014            
015            /**
016             * Joins inputs.
017             * @author Pavel
018             *
019             */
020            public interface JoinHelper {
021                    
022                    void join(InputEntry[] inputs, String activatorName, DataSink sink);
023    
024            }
025            
026            /**
027             * Decides whether given inputs shall be joined.
028             * @author Pavel
029             */
030            public interface JoinPredicate {
031                    
032                    boolean toBeJoined(InputEntry[] inputs, String activatorName);
033    
034            }
035            
036            protected DataSink sink;
037            
038            public void setSink(DataSink sink) {
039                    this.sink = sink;               
040            }
041    
042            // Keeps inputs preserving addition order.
043            private LinkedHashMap inputs = new LinkedHashMap();
044            
045            private class MixerInput implements DataSink {
046                    
047                    String name;            
048                    
049                    Collection data;
050                    
051                    public MixerInput(String name) {
052                            this.name = name;
053                            data = createDataCollection(name);
054                    }
055    
056                    public boolean addData(Data data) {
057                            return AbstractMixer.this.addData(name, data);
058                    }
059    
060                    public boolean controlsMultiplexing() {
061                            return AbstractMixer.this.controlsMultiplexing(name);
062                    }
063    
064                    public DataItemInfo[] getDataInfo() {
065                            return AbstractMixer.this.getDataInfo(name);
066                    }
067                    
068            }
069            
070            /**
071             * Information about input presented to join().
072             * @author Pavel
073             *
074             */
075            protected interface InputEntry {
076                    
077                    /**
078                     * @return Input name.
079                     */
080                    String getName();
081                    
082                    /**
083                     * @return Input data.
084                     */
085                    Data getData();
086                    
087                    /**
088                     * Join handler invokes this method to indicate that given
089                     * data item shall not be used in any further joins, i.e. it's been
090                     * "consumed".
091                     */
092                    void removeFromJoin();
093                    
094            }
095            
096            /**
097             * Abstract mixer presents all permutations of newly arrived data from given input 
098             * with previously arrived data from other inputs plus null values to include cases
099             * with inputs without data.  
100             * @param inputs Inputs for joining
101             * @param activatorName name of the input which activated this join (newly arrived data)
102             * @param sink DataSink to write results of joining to (if any).
103             */
104            protected abstract void join(InputEntry[] inputs, String activatorName, DataSink sink);
105                    
106            /**
107             * @param name
108             * @return Named input. Inputs are created on request.
109             */
110            public DataSink getInput(String name) {
111                    DataSink ret = (DataSink) inputs.get(name);
112                    if (ret==null) {
113                            ret = new MixerInput(name);
114                            inputs.put(name, ret);
115                    }
116                    
117                    return ret;
118            }
119            
120            /**
121             * @param name Input name.
122             * @return multiplexing control flag for given named input.
123             */
124            protected boolean controlsMultiplexing(String name) {
125                    return false;
126            }
127            
128            /**
129             * @param name Input name.
130             * @return Metadata for given named input.
131             */
132            protected DataItemInfo[] getDataInfo(String name) {
133                    return null;
134            }
135            
136            /**
137             * Creates collection to hold input data for joining.
138             * This implementation creates ArrayList. Override to create other types
139             * of collections, e.g. Set or persistent collection for long-running, restartable
140             * data flows.
141             * @param name Input name.
142             * @return collection to hold input data for joining.
143             */
144            protected Collection createDataCollection(String name) {
145                    return new ArrayList();
146            }
147    
148            /**
149             * This method invokes join logic.
150             * @param name Input name.
151             * @param data Input data.
152             * @return Always returns false - never consumes input. Override if needed.
153             */
154            protected boolean addData(String name, Data data) {
155                    InputEntry[] inputEntries = new InputEntry[this.inputs.size()];
156                    MixerInput[] mixerInputs = new MixerInput[inputEntries.length];
157                    Iterator it = inputs.values().iterator();
158                    for (int i=0; it.hasNext(); ++i) {
159                            mixerInputs[i] = (MixerInput) it.next();
160                    }
161                    
162                    // Start joining data.
163                    doJoin(name, data, inputEntries, mixerInputs, 0);
164                    
165                    return false;
166            }
167            
168            /**
169             * Iterates over all data items to pass eventually to join().
170             * Method is synchronized to avoid concurrent modification exceptions.
171             * @param name Activator item name.
172             * @param data Activator data.
173             * @param inputEntries Input entries
174             * @param mixerInputs Mixer inputs.
175             * @param inputIdx Index of currently processed input.
176             * @return If one of entries was returned from join the method returns index
177             * of that entry. All entries with higher index shall stop processing further
178             * join.
179             */
180            private synchronized int doJoin(
181                            String name, 
182                            Data data, 
183                            InputEntry[] inputEntries,
184                            MixerInput[] mixerInputs, 
185                            int inputIdx) {
186                                    
187                    class InputEntryImpl implements InputEntry {
188                            
189                            String name;
190                            Data data;
191                            boolean removedFromJoin;
192                            
193                            InputEntryImpl(String name, Data data) {
194                                    this.data = data;
195                                    this.name = name;
196                            }
197                            
198                            void setData(Data data) {
199                                    this.data = data;
200                            }
201    
202                            boolean isRemovedFromJoin() {
203                                    return removedFromJoin;
204                            }
205                            
206                            public Data getData() {
207                                    return data;
208                            }
209    
210                            public String getName() {
211                                    return name;
212                            }
213    
214                            public void removeFromJoin() {
215                                    removedFromJoin = true;                         
216                            }
217    
218                            void resetRemoveFromJoin() {
219                                    removedFromJoin = false;                        
220                            }
221                            
222                    }
223                    
224                    if (inputIdx == inputEntries.length) {
225                            join(inputEntries, name, sink);
226                            for (int i=0; i<inputEntries.length; ++i) {
227                                    if (inputEntries[i] instanceof InputEntryImpl && ((InputEntryImpl) inputEntries[i]).isRemovedFromJoin()) {
228                                            return i;
229                                    }
230                            }
231                    } else {
232                            if (mixerInputs[inputIdx].name.equals(name)) {
233                                    InputEntryImpl currentEntry = new InputEntryImpl(name, data);
234                                    inputEntries[inputIdx] = currentEntry;
235                                    int result = doJoin(name, data, inputEntries, mixerInputs, inputIdx+1);
236                                    if (!currentEntry.isRemovedFromJoin()) {
237                                            mixerInputs[inputIdx].data.add(data);
238                                    } 
239                                    return result;
240                            } else {
241                                    inputEntries[inputIdx] = null;
242                                    int result = doJoin(name, data, inputEntries, mixerInputs, inputIdx+1); // Join with null first.
243                                    if (result<inputIdx) {
244                                            return result;
245                                    }
246                                    InputEntryImpl currentEntry = new InputEntryImpl(name, null);
247                                    inputEntries[inputIdx] = currentEntry;
248                                    Iterator it = mixerInputs[inputIdx].data.iterator();
249                                    while (it.hasNext()) {
250                                            currentEntry.setData((Data) it.next());
251                                            result = doJoin(name, data, inputEntries, mixerInputs, inputIdx+1);
252                                            if (currentEntry.isRemovedFromJoin()) {
253                                                    it.remove();
254                                                    currentEntry.resetRemoveFromJoin();
255                                            }
256                                            if (result<inputIdx) {
257                                                    return result;
258                                            }
259                                    }
260                            }
261                    }
262                    return inputEntries.length;
263            }
264    
265            /**
266             * Factory method for data. This implementation creates MapData. Override
267             * to create different type of Data, e.g. persistent data backed by relational
268             * database.
269             * @param chain Chain of data to delegate data item requests. Can be null. 
270             * @return
271             */
272            protected Data createData(Data[] chain) {
273                    return chain==null ? new MapData() : new MapData(chain);
274            }
275            
276            /**
277             * Creates output data by mounting non-null inputs using their names plus separator
278             * as prefixes.
279             * @param inputs
280             * @return
281             */
282            protected Data mountInputs(InputEntry[] inputs, String separator) {
283                    Data ret = createData(null);
284                    for (int i=0; i<inputs.length; ++i) {
285                            if (inputs[i]!=null) {
286                                    Data iData = inputs[i].getData();
287                                    if (iData!=null) {
288                                            ret.mount(inputs[i].getName()+separator, iData);
289                                    }
290                            }
291                    }
292                    return ret;
293            }
294    
295            /**
296             * Creates output data by chaining non-null inputs.
297             * @param inputs
298             * @return
299             */
300            protected Data chainInputs(InputEntry[] inputs) {
301                    List chain = new ArrayList();
302                    for (int i=0; i<inputs.length; ++i) {
303                            if (inputs[i]!=null) {
304                                    Data iData = inputs[i].getData();
305                                    if (iData!=null) {
306                                            chain.add(iData);
307                                    }
308                            }
309                    }
310                    if (chain.isEmpty()) {
311                            return createData(null);
312                    }
313                    return createData((Data[]) chain.toArray(new Data[chain.size()]));
314            }
315    }