001 package biz.hammurapi.dataflow; 002 003 import java.util.ArrayList; 004 import java.util.HashMap; 005 import java.util.Iterator; 006 import java.util.List; 007 import java.util.Map; 008 009 import biz.hammurapi.config.Component; 010 import biz.hammurapi.convert.ConvertingService; 011 import biz.hammurapi.util.Worker; 012 013 /** 014 * Performs multi-threaded data pumping between data sources and 015 * sinks. Converts data item types as necessary. 016 * @author Pavel 017 */ 018 public class DataPump implements Component { 019 020 public void setOwner(Object owner) { 021 // Nothing to do. 022 } 023 024 private Worker worker; 025 026 /** 027 * Worker to execute pumping jobs. If worker is not set, all tasks are 028 * executed in the starting thread. 029 * @param worker 030 */ 031 public void setWorker(Worker worker) { 032 this.worker = worker; 033 } 034 035 private class Connection { 036 DataSource source; 037 DataSinkProxy sink; 038 039 public Connection(DataSource source, DataSinkProxy sink) { 040 super(); 041 this.source = source; 042 this.sink = sink; 043 } 044 } 045 046 private List connections = new ArrayList(); 047 048 /** 049 * Blocks sources when stopped. 050 * @author Pavel 051 * 052 */ 053 protected class DataSinkProxy implements DataSink { 054 055 DataSink sink; 056 private DataItemInfo[] sinkInfo; 057 058 public DataSinkProxy(DataSink sink) { 059 this.sink = sink; 060 this.sinkInfo = sink.getDataInfo(); 061 } 062 063 public boolean addData(final Data data) { 064 if (sink.controlsMultiplexing()) { 065 return sink.addData(passData(data, sinkInfo)); 066 } 067 068 Runnable pumpJob = new Runnable() { 069 070 public void run() { 071 sink.addData(passData(data, sinkInfo)); 072 } 073 074 }; 075 076 postJob(pumpJob); 077 return false; 078 } 079 080 public boolean controlsMultiplexing() { 081 return sink.controlsMultiplexing(); 082 } 083 084 public DataItemInfo[] getDataInfo() { 085 return sink.getDataInfo(); 086 } 087 088 } 089 090 private boolean isStarted = false; 091 092 /** 093 * Adds connection to be processed. 094 * @param source 095 * @param sink 096 */ 097 public void addConnection(DataSource source, DataSink sink) { 098 synchronized (connections) { 099 connections.add(new Connection(source, new DataSinkProxy(sink))); 100 } 101 } 102 103 /** 104 * Invoked in start() to establish connection between source and sink. 105 * @param source 106 * @param sink 107 */ 108 protected void connect(DataSource source, DataSinkProxy sink) { 109 source.setSink(new DataSinkProxy(sink)); 110 } 111 112 /** 113 * Invoked in stop() to terminate connection between source and sink. 114 * This implementation invokes source.setSink(null); 115 * @param source 116 * @param sink 117 */ 118 protected void disconnect(DataSource source, DataSinkProxy sink) { 119 source.setSink(null); 120 } 121 122 /** 123 * Starts data pumping. 124 */ 125 public void start() { 126 synchronized (connections) { 127 if (!isStarted) { 128 isStarted = true; 129 Iterator cit = connections.iterator(); 130 while (cit.hasNext()) { 131 Connection con = (Connection) cit.next(); 132 connect(con.source, con.sink); 133 } 134 } 135 } 136 } 137 138 /** 139 * Stops/pauses data pumping. Start method can be invoked again after 140 * stop to resume data pumping. 141 */ 142 public void stop() { 143 synchronized (connections) { 144 if (isStarted) { 145 isStarted = false; 146 Iterator cit = connections.iterator(); 147 while (cit.hasNext()) { 148 Connection con = (Connection) cit.next(); 149 disconnect(con.source, con.sink); 150 } 151 } 152 } 153 } 154 155 /** 156 * Passes data from source to sink adjusting data types. If not conversion 157 * is required data is returned as-is. 158 * Override for logging, etc. 159 * @param data 160 * @param sinkInfo 161 * @return 162 */ 163 protected Data passData(Data data, DataItemInfo[] sinkInfo) { 164 if (sinkInfo==null || sinkInfo.length==0) { 165 return data; 166 } 167 168 Map dataItemsWithAdjustedTypes = new HashMap(); 169 for (int i=0; i<sinkInfo.length; ++i) { 170 Object inputItem = data.get(sinkInfo[i].getName()); 171 if (inputItem!=null && !sinkInfo[i].getType().isInstance(inputItem)) { 172 dataItemsWithAdjustedTypes.put(sinkInfo[i].getName(), convert(inputItem, sinkInfo[i].getType())); 173 } 174 } 175 176 return dataItemsWithAdjustedTypes.isEmpty() ? data : createData(dataItemsWithAdjustedTypes , data); 177 } 178 179 /** 180 * Converts data item to expected target type. 181 * This implementation uses ConvertingService. 182 * Override as needed. 183 * @param source 184 * @param targetType 185 * @return 186 */ 187 protected Object convert(Object source, Class targetType) { 188 return ConvertingService.convert(source, targetType); 189 } 190 191 /** 192 * Creates new data with return values or error. This implementation creates 193 * Map data. 194 * @param newValues 195 * @param chain 196 * @return 197 */ 198 protected Data createData(Map newValues, Data chain) { 199 return chain==null ? new MapData(newValues) : new MapData(newValues, new Data[] {chain}); 200 } 201 202 /** 203 * This method block until all pumping jobs finish execution. 204 * If worker is not set, this method returns immediately. 205 * @throws InterruptedException 206 */ 207 public void join() throws InterruptedException { 208 if (worker!=null) { 209 synchronized (jobCounter) { 210 while (jobCounter[0]>0) { 211 jobCounter.wait(); 212 } 213 } 214 } 215 } 216 217 // Counts active jobs. 218 private int[] jobCounter = {0}; 219 220 /** 221 * Executes job either in the current thread or in a worker thread. 222 * @param job 223 */ 224 protected void postJob(final Runnable job) { 225 226 Runnable workerJob = new Runnable() { 227 228 public void run() { 229 try { 230 job.run(); 231 } finally { 232 synchronized (jobCounter) { 233 if (--jobCounter[0] <= 0) { 234 jobCounter.notifyAll(); 235 } 236 } 237 } 238 } 239 240 }; 241 242 synchronized (jobCounter) { 243 ++jobCounter[0]; 244 } 245 246 if (worker==null) { 247 workerJob.run(); 248 } else { 249 if (!worker.post(workerJob)) { 250 workerJob.run(); 251 } 252 } 253 } 254 }