001 package biz.hammurapi.dataflow; 002 003 import java.util.ArrayList; 004 import java.util.Iterator; 005 import java.util.List; 006 007 /** 008 * Dispatches data from source to sinks in the order of addition. 009 * This class doesn't do data type matching/conversion. Data info 010 * returned by this class is aggregation of data info of connected sinks. 011 * If several sinks have data items with the same name but different type, 012 * this class selects most specific data type. If it is not possible, it chooses 013 * data type of data sink which was added to multiplexor earlier. 014 * @author Pavel 015 */ 016 public class Multiplexor implements DataSink { 017 018 private List sinks = new ArrayList(); 019 020 private DataItemInfo[] dataInfo = {}; 021 022 private String name; 023 024 /** 025 * Dispatched data to sinks. Always returns false (never consumes, even 026 * if one of sinks consumes). 027 */ 028 public boolean addData(Data data) { 029 Iterator sit = sinks.iterator(); 030 while (sit.hasNext()) { 031 if (((DataSink) data).addData(data)) { 032 break; 033 } 034 } 035 return false; 036 } 037 038 /** 039 * Always returns false. 040 */ 041 public boolean controlsMultiplexing() { 042 return false; 043 } 044 045 /** 046 * Combines data input requirements from participating sinks. 047 */ 048 public DataItemInfo[] getDataInfo() { 049 return dataInfo; 050 } 051 052 private DataItemInfo createDataItemInfo(final String name, final Class type, final boolean isRequired) { 053 return new DataItemInfo() { 054 055 public String getName() { 056 return name; 057 } 058 059 public Class getType() { 060 return type; 061 } 062 063 public boolean isRequired() { 064 return isRequired; 065 } 066 067 }; 068 } 069 070 public synchronized void addSink(DataSink sink) { 071 sinks.add(sink); 072 List newDataInfo = new ArrayList(); 073 074 DataItemInfo[] sinkDataInfo = sink.getDataInfo(); 075 Z: for (int sinkIdx=0; sinkIdx<sinkDataInfo.length; ++sinkIdx) { 076 DataItemInfo sinkItem = sinkDataInfo[sinkIdx]; 077 for (int multiplexorIdx = 0; multiplexorIdx<dataInfo.length; ++multiplexorIdx) { 078 DataItemInfo multiplexorItem = dataInfo[multiplexorIdx]; 079 if (sinkItem.getName().equals(multiplexorItem.getName())) { 080 if (sinkItem.getType().isAssignableFrom(multiplexorItem.getType())) { // Multiplexor is more specific 081 if (sinkItem.isRequired() && !multiplexorItem.isRequired()) { 082 dataInfo[multiplexorIdx] = null; 083 newDataInfo.add(createDataItemInfo(sinkItem.getName(), multiplexorItem.getType(), true)); 084 } 085 } else if (multiplexorItem.getType().isAssignableFrom(sinkItem.getType())) { // Sink is more specific 086 dataInfo[multiplexorIdx] = null; 087 newDataInfo.add(createDataItemInfo(sinkItem.getName(), sinkItem.getType(), sinkItem.isRequired() || multiplexorItem.isRequired())); 088 } else { // Unrelated, retain multiplexor's. 089 if (sinkItem.isRequired() && !multiplexorItem.isRequired()) { 090 dataInfo[multiplexorIdx] = null; 091 newDataInfo.add(createDataItemInfo(sinkItem.getName(), multiplexorItem.getType(), true)); 092 } 093 } 094 continue Z; 095 } 096 } 097 newDataInfo.add(sinkItem); 098 } 099 100 for (int i=0; i<dataInfo.length; ++i) { 101 if (dataInfo[i]!=null) { 102 newDataInfo.add(dataInfo[i]); 103 } 104 } 105 106 dataInfo = (DataItemInfo[]) newDataInfo.toArray(new DataItemInfo[newDataInfo.size()]); 107 } 108 109 public String getName() { 110 return name; 111 } 112 113 public void setName(String name) { 114 this.name = name; 115 } 116 117 }