001    package biz.hammurapi.dataflow;
002    
003    import java.util.ArrayList;
004    import java.util.HashMap;
005    import java.util.Iterator;
006    import java.util.List;
007    import java.util.Map;
008    
009    import biz.hammurapi.config.Component;
010    import biz.hammurapi.convert.ConvertingService;
011    import biz.hammurapi.util.Worker;
012    
013    /**
014     * Performs multi-threaded data pumping between data sources and 
015     * sinks. Converts data item types as necessary.
016     * @author Pavel
017     */
018    public class DataPump implements Component {
019    
020            public void setOwner(Object owner) {
021                    // Nothing to do.               
022            }
023            
024            private Worker worker;
025            
026            /**
027             * Worker to execute pumping jobs. If worker is not set, all tasks are 
028             * executed in the starting thread.
029             * @param worker
030             */
031            public void setWorker(Worker worker) {
032                    this.worker = worker;
033            }
034            
035            private class Connection {
036                    DataSource source;
037                    DataSinkProxy sink;
038                    
039                    public Connection(DataSource source, DataSinkProxy sink) {
040                            super();
041                            this.source = source;
042                            this.sink = sink;
043                    }                               
044            }
045            
046            private List connections = new ArrayList();
047            
048            /**
049             * Blocks sources when stopped.
050             * @author Pavel
051             *
052             */
053            protected class DataSinkProxy implements DataSink {
054                    
055                    DataSink sink;
056                    private DataItemInfo[] sinkInfo;
057                    
058                    public DataSinkProxy(DataSink sink) {
059                            this.sink = sink;
060                            this.sinkInfo = sink.getDataInfo();
061                    }
062    
063                    public boolean addData(final Data data) {
064                            if (sink.controlsMultiplexing()) {
065                                    return sink.addData(passData(data, sinkInfo));
066                            } 
067                            
068                            Runnable pumpJob = new Runnable() {
069    
070                                    public void run() {
071                                            sink.addData(passData(data, sinkInfo));
072                                    }
073                                    
074                            };
075                            
076                            postJob(pumpJob);                       
077                            return false;
078                    }
079    
080                    public boolean controlsMultiplexing() {
081                            return sink.controlsMultiplexing();
082                    }
083    
084                    public DataItemInfo[] getDataInfo() {
085                            return sink.getDataInfo();
086                    }
087                    
088            }
089            
090            private boolean isStarted = false;
091            
092            /**
093             * Adds connection to be processed.
094             * @param source
095             * @param sink
096             */
097            public void addConnection(DataSource source, DataSink sink) {
098                    synchronized (connections) {
099                            connections.add(new Connection(source, new DataSinkProxy(sink)));
100                    }
101            }
102            
103            /**
104             * Invoked in start() to establish connection between source and sink.
105             * @param source
106             * @param sink
107             */
108            protected void connect(DataSource source, DataSinkProxy sink) {
109                    source.setSink(new DataSinkProxy(sink));
110            }
111    
112            /**
113             * Invoked in stop() to terminate connection between source and sink.
114             * This implementation invokes source.setSink(null);
115             * @param source
116             * @param sink
117             */
118            protected void disconnect(DataSource source, DataSinkProxy sink) {
119                    source.setSink(null);
120            }
121            
122            /**
123             * Starts data pumping.
124             */
125            public void start() {
126                    synchronized (connections) {
127                            if (!isStarted) {
128                                    isStarted = true;
129                                    Iterator cit = connections.iterator();
130                                    while (cit.hasNext()) {
131                                            Connection con = (Connection) cit.next();
132                                            connect(con.source, con.sink);
133                                    }
134                            }
135                    }               
136            }
137    
138            /**
139             * Stops/pauses data pumping. Start method can be invoked again after
140             * stop to resume data pumping. 
141             */
142            public void stop() {
143                    synchronized (connections) {
144                            if (isStarted) {
145                                    isStarted = false;
146                                    Iterator cit = connections.iterator();
147                                    while (cit.hasNext()) {
148                                            Connection con = (Connection) cit.next();
149                                            disconnect(con.source, con.sink);
150                                    }
151                            }
152                    }               
153            }
154            
155            /**
156             * Passes data from source to sink adjusting data types. If not conversion
157             * is required data is returned as-is.
158             * Override for logging, etc.
159             * @param data
160             * @param sinkInfo 
161             * @return
162             */
163            protected Data passData(Data data, DataItemInfo[] sinkInfo) {
164                    if (sinkInfo==null || sinkInfo.length==0) {
165                            return data;
166                    }
167                    
168                    Map dataItemsWithAdjustedTypes = new HashMap();
169                    for (int i=0; i<sinkInfo.length; ++i) {
170                            Object inputItem = data.get(sinkInfo[i].getName());
171                            if (inputItem!=null && !sinkInfo[i].getType().isInstance(inputItem)) {
172                                    dataItemsWithAdjustedTypes.put(sinkInfo[i].getName(), convert(inputItem, sinkInfo[i].getType()));
173                            }
174                    }
175                    
176                    return dataItemsWithAdjustedTypes.isEmpty() ? data : createData(dataItemsWithAdjustedTypes , data);
177            }
178    
179            /**
180             * Converts data item to expected target type.
181             * This implementation uses ConvertingService. 
182             * Override as needed.
183             * @param source
184             * @param targetType
185             * @return
186             */
187            protected Object convert(Object source, Class targetType) {
188                    return ConvertingService.convert(source, targetType);
189            }
190    
191            /**
192             * Creates new data with return values or error. This implementation creates
193             * Map data.
194             * @param newValues
195             * @param chain
196             * @return
197             */
198            protected Data createData(Map newValues, Data chain) {
199                    return chain==null ? new MapData(newValues) : new MapData(newValues, new Data[] {chain});
200            }
201            
202            /**
203             * This method block until all pumping jobs finish execution.
204             * If worker is not set, this method returns immediately.
205             * @throws InterruptedException 
206             */
207            public void join() throws InterruptedException {
208                    if (worker!=null) {
209                            synchronized (jobCounter) {
210                                    while (jobCounter[0]>0) {
211                                            jobCounter.wait();
212                                    }
213                            }
214                    }
215            }
216            
217            // Counts active jobs.
218            private int[] jobCounter = {0};
219    
220            /**
221             * Executes job either in the current thread or in a worker thread.
222             * @param job
223             */
224            protected void postJob(final Runnable job) {
225                    
226                    Runnable workerJob = new Runnable() {
227    
228                            public void run() {
229                                    try {
230                                            job.run();
231                                    } finally {
232                                            synchronized (jobCounter) {
233                                                    if (--jobCounter[0] <= 0) {
234                                                            jobCounter.notifyAll();
235                                                    }
236                                            }
237                                    }                                       
238                            }
239                            
240                    };
241                    
242                    synchronized (jobCounter) {
243                            ++jobCounter[0];
244                    }
245                    
246                    if (worker==null) {
247                            workerJob.run();
248                    } else {                        
249                            if (!worker.post(workerJob)) {
250                                    workerJob.run();
251                            }
252                    }
253            }
254    }