001    package biz.hammurapi.dataflow;
002    
003    import java.util.ArrayList;
004    import java.util.Iterator;
005    import java.util.List;
006    
007    /**
008     * Dispatches data from source to sinks in the order of addition.
009     * This class doesn't do data type matching/conversion. Data info
010     * returned by this class is aggregation of data info of connected sinks.
011     * If several sinks have data items with the same name but different type, 
012     * this class selects most specific data type. If it is not possible, it chooses
013     * data type of data sink which was added to multiplexor earlier.  
014     * @author Pavel
015     */
016    public class Multiplexor implements DataSink {
017            
018            private List sinks = new ArrayList();
019            
020            private DataItemInfo[] dataInfo = {};
021    
022            private String name;
023    
024            /**
025             * Dispatched data to sinks. Always returns false (never consumes, even
026             * if one of sinks consumes).
027             */
028            public boolean addData(Data data) {
029                    Iterator sit = sinks.iterator();
030                    while (sit.hasNext()) {
031                            if (((DataSink) data).addData(data)) {
032                                    break;
033                            }
034                    }
035                    return false;
036            }
037    
038            /**
039             * Always returns false.
040             */
041            public boolean controlsMultiplexing() {
042                    return false;
043            }
044    
045            /**
046             * Combines data input requirements from participating sinks.
047             */
048            public DataItemInfo[] getDataInfo() {
049                    return dataInfo;
050            }
051            
052            private DataItemInfo createDataItemInfo(final String name, final Class type, final boolean isRequired) {
053                    return new DataItemInfo() {
054    
055                            public String getName() {
056                                    return name;
057                            }
058    
059                            public Class getType() {
060                                    return type;
061                            }
062    
063                            public boolean isRequired() {
064                                    return isRequired;
065                            }
066                            
067                    };
068            }
069    
070            public synchronized void addSink(DataSink sink) {
071                    sinks.add(sink);
072                    List newDataInfo = new ArrayList();
073                    
074                    DataItemInfo[] sinkDataInfo = sink.getDataInfo();
075                    Z: for (int sinkIdx=0; sinkIdx<sinkDataInfo.length; ++sinkIdx) {
076                            DataItemInfo sinkItem = sinkDataInfo[sinkIdx];
077                            for (int multiplexorIdx = 0; multiplexorIdx<dataInfo.length; ++multiplexorIdx) {
078                                    DataItemInfo multiplexorItem = dataInfo[multiplexorIdx];
079                                    if (sinkItem.getName().equals(multiplexorItem.getName())) {
080                                            if (sinkItem.getType().isAssignableFrom(multiplexorItem.getType())) { // Multiplexor is more specific
081                                                    if (sinkItem.isRequired() && !multiplexorItem.isRequired()) {
082                                                            dataInfo[multiplexorIdx] = null;
083                                                            newDataInfo.add(createDataItemInfo(sinkItem.getName(), multiplexorItem.getType(), true));                                                       
084                                                    }
085                                            } else if (multiplexorItem.getType().isAssignableFrom(sinkItem.getType())) { // Sink is more specific
086                                                    dataInfo[multiplexorIdx] = null;
087                                                    newDataInfo.add(createDataItemInfo(sinkItem.getName(), sinkItem.getType(), sinkItem.isRequired() || multiplexorItem.isRequired()));                                                                                                     
088                                            } else { // Unrelated, retain multiplexor's.
089                                                    if (sinkItem.isRequired() && !multiplexorItem.isRequired()) {
090                                                            dataInfo[multiplexorIdx] = null;
091                                                            newDataInfo.add(createDataItemInfo(sinkItem.getName(), multiplexorItem.getType(), true));                                                       
092                                                    }                                               
093                                            }
094                                            continue Z;
095                                    }
096                            }
097                            newDataInfo.add(sinkItem);
098                    }
099    
100                    for (int i=0; i<dataInfo.length; ++i) {
101                            if (dataInfo[i]!=null) {
102                                    newDataInfo.add(dataInfo[i]);
103                            }
104                    }
105    
106                    dataInfo = (DataItemInfo[]) newDataInfo.toArray(new DataItemInfo[newDataInfo.size()]);
107            }
108    
109            public String getName() {
110                    return name;
111            }
112            
113            public void setName(String name) {
114                    this.name = name;
115            }
116    
117    }