001    package biz.hammurapi.dataflow;
002    
003    import java.beans.BeanInfo;
004    import java.beans.IntrospectionException;
005    import java.beans.Introspector;
006    import java.beans.PropertyDescriptor;
007    import java.lang.reflect.InvocationTargetException;
008    import java.lang.reflect.Method;
009    import java.util.Collection;
010    import java.util.HashMap;
011    import java.util.Iterator;
012    import java.util.LinkedHashMap;
013    import java.util.Map;
014    
015    import org.apache.bcel.classfile.JavaClass;
016    import org.apache.bcel.classfile.LocalVariable;
017    import org.apache.bcel.classfile.LocalVariableTable;
018    import org.apache.bcel.util.ClassLoaderRepository;
019    
020    import biz.hammurapi.convert.ConvertingService;
021    
022    /**
023     * Data pipe wrapper for Java method.
024     * Input data items are named after method parameters. Wrapped method shall be compiled with
025     * debug on for correct resolution of parameter names. If parameter names cannot 
026     * be resolved, they are automatically named arg0, ...
027     * @author Pavel
028     *
029     */
030    public class JavaMethodDataPipe implements DataPipe {
031            
032            public static final String ERROR = "error";
033            public static final String RESULT = "result";
034    
035            private Method method;
036            private Object instance;
037            private String returnName;
038            private boolean beanToData;
039            private boolean iterateOverReturn;
040            private String[] parameterNames;
041            private Class[] parameterTypes;
042            private DataSink sink;
043            private DataItemInfo[] dataInfo;
044            
045            /**
046             * Creates data pipe which invokes given method.
047             * @param method Method.
048             * @param instance Instances to invoke method against.
049             * @param returnName If beanToData is false, return value is stored in returnName data item. If beanToData is true,
050             * return name prefixes bean property names, unless it is null. If beanToData is false and returnName is null, then
051             * returnName defaults to "result". If method return type is Data then values from it are copied to output prefixed with
052             * return name or as-is if return name is null.
053             * @param beanToData If true return value Java Bean is converted to Data using BeanData class.
054             * @param iterateOverReturn If true and return value is collection or iterator, 
055             * then the return value is iterated over.
056             */
057            public JavaMethodDataPipe(
058                            Method method, 
059                            Object instance, 
060                            String returnName,
061                            boolean beanToData,                     
062                            boolean iterateOverReturn) 
063            {
064                    this.method = method;
065                    this.instance = instance;
066                    this.returnName = returnName;
067                    this.beanToData = beanToData;
068                    this.iterateOverReturn = iterateOverReturn;
069                    parameterTypes = method.getParameterTypes();
070                    parameterNames = new String[parameterTypes.length];
071                    for (int i=0; i<parameterTypes.length; ++i) {
072                            parameterNames[i] = "arg"+i;
073                    }
074                    Class declaringClass = method.getDeclaringClass();
075                    ClassLoaderRepository clr = new ClassLoaderRepository(declaringClass.getClassLoader());
076                    try {
077                            JavaClass dcl = clr.loadClass(declaringClass);
078                            org.apache.bcel.classfile.Method mth = dcl.getMethod(method);
079                            LocalVariableTable lvt = mth.getLocalVariableTable();
080                            LocalVariable[] localVars = lvt==null ? null : lvt.getLocalVariableTable();
081                            if (localVars != null) {
082                                    for (int i=0, j = mth.isStatic() ? 0 : 1; i<parameterNames.length && j<localVars.length; ++i, ++j) {
083                                            LocalVariable localVariable = localVars[j];
084                                            if (localVariable!=null) {
085                                                    parameterNames[i] = localVariable.getName();
086                                            }
087                                    }
088                            }
089                    } catch (ClassNotFoundException e) {
090                            throw new DataFlowException("Should never happen: "+e, e);
091                    }
092                    dataInfo = new DataItemInfo[parameterTypes.length];
093                    for (int i=0; i<dataInfo.length; ++i) {
094                            final int idx = i;
095                            dataInfo[i] = new DataItemInfo() {
096    
097                                    public String getName() {
098                                            return parameterNames[idx];
099                                    }
100    
101                                    public Class getType() {
102                                            return parameterTypes[idx];
103                                    }
104    
105                                    public boolean isRequired() {
106                                            return parameterTypes[idx].isPrimitive();
107                                    }
108                                    
109                            };
110                    }
111            }
112            
113            public boolean addData(Data data) {
114                    Object[] args = new Object[parameterTypes.length];
115                    for (int i=0; i<args.length; ++i) {
116                            args[i] = convert(data.get(parameterNames[i]), parameterTypes[i]);
117                    }
118                    
119                    try {
120                            Object ret = method.invoke(instance, args);
121                            if (ret!=null && sink!=null) {
122                                    if (iterateOverReturn) {
123                                            if (ret instanceof Collection) {
124                                                    Iterator it = ((Collection) ret).iterator();
125                                                    while (it.hasNext()) {
126                                                            resultToSink(it.next(), data);
127                                                    }
128                                            } else if (ret instanceof Iterator) {
129                                                    while (((Iterator) ret).hasNext()) {
130                                                            resultToSink(((Iterator) ret).next(), data);
131                                                    }
132                                            } else {
133                                                    resultToSink(ret, data);
134                                            }
135                                    } else {
136                                            resultToSink(ret, data);                                        
137                                    }
138                            }
139                    } catch (IllegalArgumentException e) {
140                            handleException(e, data);
141                    } catch (InvocationTargetException e) {
142                            Throwable cause = e.getCause();
143                            handleException(cause, data);
144                    } catch (Exception e) {
145                            throw new DataFlowException(e);
146                    }
147                    return false;
148            }       
149    
150            private void resultToSink(Object next, Data input) throws 
151                    IntrospectionException, 
152                    IllegalArgumentException, 
153                    IllegalAccessException, 
154                    InvocationTargetException {
155                    
156                    Map newValues = new HashMap();
157                    if (next instanceof Data) {
158                            Data retData = (Data) next;
159                            Iterator it = retData.getPropertyNames().iterator();
160                            while (it.hasNext()) {
161                                    String pName = (String) it.next();
162                                    newValues.put(returnName==null ? pName : returnName+pName, retData.get(pName));
163                            }
164                    } else if (beanToData) {
165                            BeanInfo beanInfo = Introspector.getBeanInfo(next.getClass());
166                            PropertyDescriptor[] pda = beanInfo.getPropertyDescriptors();
167                            for (int i=0; i<pda.length; ++i) {
168                                    Method readMethod = pda[i].getReadMethod();
169                                    if (readMethod!=null && readMethod.getParameterTypes().length==0) {
170                                            newValues.put(returnName==null ? pda[i].getName() : returnName + pda[i].getName(), readMethod.invoke(next, null));
171                                    }
172                            }
173                    } else {
174                            newValues.put(returnName==null ? RESULT : returnName, next);
175                    }
176                    sink.addData(createData(newValues, input));
177            }
178    
179            private void handleException(Throwable cause, Data input) {
180                    Map.Entry handler = null;
181                    Iterator it = exceptionHandlers.entrySet().iterator();
182                    while (it.hasNext()) {
183                            Map.Entry entry = (Map.Entry) it.next();
184                            Class exceptionType = (Class) entry.getKey();
185                            if (exceptionType.isInstance(cause)) {
186                                    if (handler == null || ((Class) handler.getKey()).isAssignableFrom(exceptionType)) {
187                                            handler = entry;
188                                    }
189                            }
190                    }
191                    
192                    if (handler==null) {
193                            throw new DataFlowException("Unhandled exception", cause);
194                    }
195                    
196                    if (sink!=null) {
197                            Map newValues = new HashMap();
198                            newValues.put(ERROR, cause);
199                            sink.addData(createData(newValues, input));
200                    }
201            }
202    
203            public boolean controlsMultiplexing() {
204                    return false;
205            }
206    
207            public DataItemInfo[] getDataInfo() {
208                    return dataInfo;
209            }
210    
211            public String getName() {
212                    return method.getName();
213            }
214    
215            public void setSink(DataSink sink) {
216                    this.sink = sink;
217            }
218                    
219            private LinkedHashMap exceptionHandlers = new LinkedHashMap();
220    
221            /**
222             * Sets exception handler. Handlers are evaluated in the order of
223             * generality, i.e. handlers are never shadowed.
224             * @param exceptionType
225             * @param sink
226             */
227            public void setExceptionHandler(Class exceptionType, DataSink sink) {
228                    exceptionHandlers.put(exceptionType, sink);
229            }
230            
231            /**
232             * Converts input data item to target type. This implementation used
233             * ConvertingService, override as needed.
234             * @param src Source object
235             * @param targetType
236             * @return
237             */
238            protected Object convert(Object src, Class targetType) {
239                    return ConvertingService.convert(src, targetType);
240            }
241            
242            /**
243             * Creates new data with return values or error. This implementation creates
244             * Map data.
245             * @param newValues
246             * @param chain
247             * @return
248             */
249            protected Data createData(Map newValues, Data chain) {
250                    return chain==null ? new MapData(newValues) : new MapData(newValues, new Data[] {chain});
251            }
252    }