001    /*
002     * hammurapi-rules @mesopotamia.version@
003     * Hammurapi rules engine. 
004     * Copyright (C) 2005  Hammurapi Group
005     *
006     * This program is free software; you can redistribute it and/or
007     * modify it under the terms of the GNU Lesser General Public
008     * License as published by the Free Software Foundation; either
009     * version 2 of the License, or (at your option) any later version.
010     *
011     * This program is distributed in the hope that it will be useful,
012     * but WITHOUT ANY WARRANTY; without even the implied warranty of
013     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
014     * Lesser General Public License for more details.
015     *
016     * You should have received a copy of the GNU Lesser General Public
017     * License along with this library; if not, write to the Free Software
018     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
019     *
020     * URL: http://http://www.hammurapi.biz
021     * e-Mail: support@hammurapi.biz
022     */
023    package biz.hammurapi.rules.backwardreasoning;
024    
025    import java.lang.reflect.UndeclaredThrowableException;
026    import java.util.ArrayList;
027    import java.util.Collection;
028    import java.util.Collections;
029    import java.util.HashMap;
030    import java.util.Iterator;
031    import java.util.List;
032    import java.util.Map;
033    import java.util.NoSuchElementException;
034    import java.util.Set;
035    import java.util.logging.Level;
036    import java.util.logging.Logger;
037    
038    import org.w3c.dom.Element;
039    import org.w3c.dom.Node;
040    
041    import biz.hammurapi.config.ConfigurationException;
042    import biz.hammurapi.config.DomConfigurableContainer;
043    import biz.hammurapi.config.RuntimeConfigurationException;
044    import biz.hammurapi.dispatch.InvocationHandler;
045    import biz.hammurapi.dispatch.ResultConsumer;
046    import biz.hammurapi.rules.CollectionManager;
047    import biz.hammurapi.rules.Constants;
048    import biz.hammurapi.rules.FactSource;
049    import biz.hammurapi.rules.Rule;
050    import biz.hammurapi.rules.RulesRuntimeException;
051    import biz.hammurapi.rules.backwardreasoning.Accumulator.AccumulatorIterator;
052    import biz.hammurapi.util.ExceptionSink;
053    import biz.hammurapi.util.Worker;
054    import biz.hammurapi.xml.dom.DOMUtils;
055    
056    /**
057     * Backward reasoning rules container.
058     * How to use:
059     * <OL>
060     * <LI>Instantiate from XML definition.</LI>
061     * <LI>Set collection manager if rule set contains join rules (inference methods with more than one parameter).</LI>
062     * <LI>Set worker for concurrent reasoning. (Optional)</LI>
063     * <LI>Set exception sink. Otherwise all exceptions will be reported to the logger. (Optional)</LI>
064     * <LI>Set fact sources.</LI>
065     * <LI>Invoke getFacts() for fact/conclusion type of interest.</LI>
066     * </OL>
067     * 
068     * @author Pavel Vlasov
069     * @revision $Revision$
070     */
071    public class BackwardReasoningRulesContainer extends DomConfigurableContainer implements Constants, FactSource {
072            
073            private static final Logger logger = Logger.getLogger(BackwardReasoningRulesContainer.class.getName());
074            
075            // Primarily for troubleshooting.
076            private Collection allJobs = new ArrayList();
077    
078            private static int[] jobNumberCounter = {0};
079            
080            /**
081             * Job with number.
082             */
083            private abstract class ReasoningJob implements Runnable {
084                    
085                    int number;
086                    int pass;
087                    String name;
088                    boolean isComplete;
089                    boolean isInProgress;
090                    
091                    // Each job works on one input fact type.
092                    protected Class inputType;
093                    
094                    public ReasoningJob(String name, Class inputType) {
095                            synchronized (jobNumberCounter) {
096                                    number = ++jobNumberCounter[0];
097                            }
098                            this.name = name;
099                            this.inputType = inputType;
100                            synchronized (allJobs) {
101                                    allJobs.add(this);
102                            }
103                            logger.fine("New job: "+this);
104                    }
105                    
106                    public String toString() {
107                            return "Job # "+number+" pass "+pass+" '"+name+"' working on "+inputType.getName()+" ("+getClass().getName()+")";
108                    }                               
109                    
110            }
111    
112            /**
113             * Job which processes data from accumulator iterator, i.e. 
114             * new items may become available in the iterator and the job
115             * needs to be repeated until iterator is closed.
116             * 
117             * The job adds itself to incremental jobs queue if there are 
118             * no more items and iterator hasNext() returned false. Incremental
119             * jobs are accumulated and are sent to execution after some progress
120             * has been made by other jobs.
121             * @author Pavel
122             *
123             */
124            private abstract class IncrementalJob extends ReasoningJob {
125                    
126                    AccumulatorIterator iterator;
127    
128                    IncrementalJob(String name, Class inputType, AccumulatorIterator iterator) {
129                            super(name, inputType);
130                            this.iterator = iterator;
131                            this.inputType = inputType;
132                    }
133    
134                    public void run() {
135                            logger.fine("Running "+this.toString()+" in "+Thread.currentThread().getName());
136                            isInProgress = true;
137                            while (iterator.hasNext()) {
138                                    doIteration(iterator.next());
139                            }
140                            
141                            if (iterator.isClosed()) {
142                                    isComplete = true;
143                                    close();
144                                    synchronized (allJobs) {
145                                            allJobs.remove(this);
146                                            logger.fine("Job finished: "+this.toString()+" in "+Thread.currentThread().getName());
147                                    }
148                            } else {
149                                    logger.fine("Job sent to next pass: "+this.toString()+" in "+Thread.currentThread().getName());                         
150                                    queueIncrementalJob(this);
151                            }
152                    }
153    
154                    /**
155                     * This method is invoked for each iterator value.
156                     * If the job produced conclusions from that value,
157                     * it should invoke onProgress(jobNumber, factTypes) to inform
158                     * other jobs that there is additional information 
159                     * for them.
160                     * @param next
161                     */
162                    abstract void doIteration(Object next);
163                    
164                    abstract void close();
165                    
166            }
167            
168            /**
169             * This class pumps data from one accumulator to another (from subtype to type).
170             * @author Pavel
171             *
172             */
173            private class AccumulatorJob extends IncrementalJob {
174                    
175                    Accumulator accumulator;
176                    Class outputType;
177    
178                    AccumulatorJob(String name, Class inputType, Class outputType, AccumulatorIterator iterator, Accumulator accumulator) {
179                            super(name, inputType, iterator);
180                            this.accumulator = accumulator;
181                            accumulator.addProducer(this);
182                            this.outputType = outputType;
183                    }
184    
185                    void close() {
186                            logger.fine("Accumulator job closed: "+this);
187                            accumulator.close(this);
188                    }
189    
190                    void doIteration(Object next) {
191                            if (next!=null) {
192                                    accumulator.add(next);
193                                    onProgress(this, outputType);
194                            }
195                    }
196                    
197            }
198    
199            /**
200             * Reads facts from external fact source to accumulator.
201             * @author Pavel
202             */
203            private class FactSourceJob extends ReasoningJob {
204    
205                    private FactSource factSource;
206                    private Accumulator accumulator;
207    
208                    public FactSourceJob(FactSource factSource, Class inputType) {
209                            super("Reads "+inputType.getName()+" external fact source "+factSource, inputType);
210                            this.factSource = factSource;
211                            TypeEntry te = getTypeEntry(inputType);
212                            accumulator = te.addJob(this);
213                    }
214    
215                    public void run() {
216                            isInProgress = true;
217                            try {
218                                    Iterator it = factSource.getFacts(inputType);
219                                    while (it.hasNext()) {
220                                            Object next = it.next();
221                                            if (next!=null) {
222                                                    accumulator.add(next);
223                                                    onProgress(this, inputType);
224                                            }
225                                    }
226                            } finally {
227                                    accumulator.close(this);
228                                    isComplete = true;
229                                    synchronized (allJobs) {
230                                            allJobs.remove(this);
231                                    }
232                            }
233                    }               
234            }               
235            
236            /**
237             * Executes invocation handler
238             * @author Pavel
239             *
240             */
241            private class InvocationHandlerJob extends IncrementalJob {
242                    
243                    InvocationHandler invocationHandler;
244                    Class[] outputTypes;
245                    ResultConsumer resultConsumer;
246                    
247                    // Class -> Accumulator
248                    Map accumulatorMap = new HashMap();
249    
250                    InvocationHandlerJob(String name, InvocationHandler invocationHandler) {                        
251                            super(name, invocationHandler.getParameterType(), getTypeEntry(invocationHandler.getParameterType()).lazyIterator());
252                            this.invocationHandler = invocationHandler;
253                            
254                            outputTypes = invocationHandler.getFactTypes();
255                            for (int i=0; i<outputTypes.length; ++i) {
256                                    accumulatorMap.put(outputTypes[i], getTypeEntry(outputTypes[i]).addJob(this));
257                            }
258                            
259                            resultConsumer = new ResultConsumer() {
260    
261                                    public void consume(Object result) {                                    
262                                            for (int i=0; i<outputTypes.length; ++i) {
263                                                    if (outputTypes[i].isInstance(result)) {
264                                                            ((Accumulator) accumulatorMap.get(outputTypes[i])).add(result);
265                                                            onProgress(InvocationHandlerJob.this, outputTypes[i]);
266                                                    }
267                                            }
268                                    }
269                                    
270                            };
271                    }
272    
273                    void close() {
274                            logger.fine("Invocation handler job closed: "+this);
275                            for (int i=0; i<outputTypes.length; ++i) {
276                                    ((Accumulator) accumulatorMap.get(outputTypes[i])).close(this);
277                            }
278                    }
279    
280                    void doIteration(Object next) {
281                            if (inputType.isInstance(next)) {
282                                    try {
283                                            invocationHandler.invoke(next, resultConsumer);
284                                    } catch (Exception e) {
285                                            if (exceptionSink==null) {
286                                                    logger.log(Level.SEVERE, "Exception in "+this+": "+e, e);
287                                            } else {
288                                                    exceptionSink.consume(this, e);
289                                            }
290                                    } catch (Throwable th) {
291                                            if (th instanceof Error) {
292                                                    throw (Error) th;
293                                            }
294                                            throw new UndeclaredThrowableException(th);
295                                    }
296                            }
297                    }               
298            }
299            
300            // Class -> TypeEntry
301            private Map typeEntries = new HashMap();
302            
303            /**
304             * 
305             * @author Pavel
306             *
307             */
308            private class TypeEntry {
309                    Class type;
310                    Accumulator theAccumulator; 
311                    Collection thePendingJobs = new ArrayList();
312                    
313                    TypeEntry(Class type) {
314                            this.type = type;
315                            theAccumulator = createAccumulator(type);
316                    }
317                    
318                    Accumulator addJob(ReasoningJob job) {
319                            thePendingJobs.add(job);
320                            theAccumulator.addProducer(job);
321                            return theAccumulator;
322                    }
323                    
324                    /**
325                     * If there are pending jobs for the accumulator, they are posted to execution.
326                     * @return Accumulator with all jobs scheduled for execution.
327                     */
328                    Accumulator getAccumulator() {
329                            Collection toPost = new ArrayList();
330                            synchronized (thePendingJobs) {
331                                    Iterator it = thePendingJobs.iterator();
332                                    while (it.hasNext()) {
333                                            ReasoningJob pendingJob = (ReasoningJob) it.next();
334                                            synchronized (pendingJob) {
335                                                    if (pendingJob.isComplete || pendingJob.isInProgress) {
336                                                            it.remove();
337                                                    } else {
338                                                            pendingJob.isInProgress = true;
339                                                            toPost.add(pendingJob);
340                                                    }
341                                            }
342                                    }
343                            }
344    
345                            Iterator it = toPost.iterator();
346                            while (it.hasNext()) {
347                                    postJob((ReasoningJob) it.next());
348                            }
349                            return theAccumulator;
350                    }
351                    
352                    
353                    /**
354                     * @return Iterator which activates pending jobs on first access.
355                     */
356                    AccumulatorIterator lazyIterator() {                    
357                            return new Accumulator.AccumulatorIterator() {
358                                    
359                                    AccumulatorIterator master;
360                                    
361                                    AccumulatorIterator getMaster() {
362                                            if (master==null) {
363                                                    master = getAccumulator().iterator();
364                                            }
365                                            return master;
366                                    }
367    
368                                    public boolean isClosed() {
369                                            return getMaster().isClosed();
370                                    }
371    
372                                    public boolean hasNext() {
373                                            return getMaster().hasNext();
374                                    }
375    
376                                    public Object next() {
377                                            return getMaster().next();
378                                    }
379    
380                                    public void remove() {
381                                            getMaster().remove();
382                                    }
383                                    
384                            };
385                    }
386            }
387            
388            private Worker worker;
389            private Collection factSources = new ArrayList();
390            private int maxWorkerJobs;
391            private ExceptionSink exceptionSink;
392            
393            private boolean started;
394            
395            /**
396             * Sets single fact source. 
397             * Resets reasoning results.
398             * @param factSource
399             */
400            public void addFactSource(FactSource factSource) {
401                    if (started) {
402                            throw new IllegalStateException("Fact source shall be set before container is started");
403                    }
404                    factSources.add(factSource);
405            }
406            
407            /**
408             * Exception sink to consume exceptions during reasoning.
409             * If exception sink is not set, exceptions are sent to logger.
410             * @param exceptionSink
411             */
412            public void setExceptionSink(ExceptionSink exceptionSink) {
413                    this.exceptionSink = exceptionSink;
414            }
415            
416            /**
417             * When concurrent reasoning is used, deadlocks are possible depending on worker implementation/configuration. 
418             * If excessive jobs are queued, a situation is possible when all active type jobs wait for a job in a queue to 
419             * complete and that job never gets to execution.   
420             * 
421             * Setting maxWorkerJobs to value greater than zero will limit number of concurrent jobs posted
422             * to worker. Excessive jobs will be executed in invoking thread, to avoid 
423             * being queued by the worker.
424             * @param maxWorkerJobs
425             */
426            public void setMaxWorkerJobs(int maxWorkerJobs) {
427                    this.maxWorkerJobs = maxWorkerJobs;
428            }
429            
430            /**
431             * Sets multiple fact sources. 
432             * Resets reasoning results.
433             * @param factSources
434             */
435            public void addFactSources(Collection factSources) {
436                    if (started) {
437                            throw new IllegalStateException("Fact sources shall be set before container is started.");
438                    }
439                    this.factSources.addAll(factSources);
440            }
441            
442            private TypeEntry getTypeEntry(Class type) {
443                    TypeEntry ret = (TypeEntry) typeEntries.get(type);
444                    if (ret == null) {
445                            ret = new TypeEntry(type);                      
446                            typeEntries.put(type, ret);
447                    }
448                    return ret;
449            }
450            
451            // Maximum number of times incremental job can be invoked.
452            protected int maxPass;
453            
454            public void start() throws ConfigurationException {
455                    super.start();
456                    started = true;
457                    Iterator fit = factSources.iterator();
458                    while (fit.hasNext()) {
459                            FactSource fs = (FactSource) fit.next();
460                            Class[] fst = fs.getFactTypes();
461                            for (int i=0; i<fst.length; ++i) {
462                                    new FactSourceJob(fs, fst[i]);
463                            }
464                    }
465    
466                    Iterator rit = getComponents().iterator();
467                    while (rit.hasNext()) {
468                            Object component = rit.next();
469                            if (component instanceof Rule) {
470                                    Rule rule = (Rule) component;
471                                    Iterator ihit = rule.getInvocationHandlers().iterator();
472                                    while (ihit.hasNext()) {
473                                            InvocationHandler ih = (InvocationHandler) ihit.next();
474                                            new InvocationHandlerJob(rule.getName(), ih);
475                                    }
476                            }
477                    }
478                    
479                    Set typeSet = typeEntries.keySet();
480                    types = new Class[typeSet.size()];
481                    Iterator tit = typeSet.iterator();
482                    for (int i=0; tit.hasNext(); ++i) {
483                            types[i] = (Class) tit.next();
484                    }
485    
486                    maxPass = allJobs.size()*allJobs.size()*1000;
487                    
488                    Iterator it = allJobs.iterator();
489                    if (logger.isLoggable(Level.FINEST)) {
490                            logger.finest("All jobs created");
491                            while (it.hasNext()) {
492                                    logger.finest("\t"+it.next());
493                            }
494                    }
495            }
496            
497            /**
498             * Collection manager is required for join (multi-parameter) rules.
499             * @param collectionManager
500             * @throws ConfigurationException 
501             */
502            public void setCollectionManager(CollectionManager collectionManager) throws ConfigurationException {
503                    addComponent(COLLECTION_MANAGER, collectionManager);
504            }
505            
506            /**
507             * Set worker for concurrent reasoning. E.g. if rules access database or 
508             * network resources for different types of facts, retrieval of these facts
509             * can be done in parallel.
510             * @param worker
511             */
512            public void setWorker(Worker worker) {
513                    this.worker = worker;
514            }
515            
516            public Collection getRules() {
517                    return Collections.unmodifiableCollection(getComponents());
518            }
519            
520            protected String getComponentName(Node node) {
521                    try {
522                            return DOMUtils.getSingleNonBlankElementText((Element) node, "name");
523                    } catch (Exception e) {
524                            throw new RuntimeConfigurationException("Cannot read rule name: "+e, e);
525                    }
526            }
527            
528            /**
529             * This method blocks until all reasoning jobs finish execution.
530             * If worker is not set, this method returns immediately.
531             * @throws InterruptedException 
532             */
533            public void join() throws InterruptedException {
534                    if (worker!=null) {
535                            synchronized (jobCounter) {
536                                    while (jobCounter[0]>0) {
537                                            jobCounter.wait();
538                                    }
539                            }
540                    }
541            }
542                    
543            private Collection incrementalJobsQueue = new ArrayList();
544            
545            private void queueIncrementalJob(IncrementalJob job) {
546                    synchronized (incrementalJobsQueue) {
547                            incrementalJobsQueue.add(job);
548                            incrementalJobsQueue.notifyAll(); // just for the case
549                    }
550            }
551            
552            /**
553             * Method to inform the reasoning system that job made progress
554             * on producing facts of given type.
555             */
556            private void onProgress(ReasoningJob job, Class factType) {
557                    logger.finest("Job "+job+" made progress on "+factType.getName());
558                    Collection toActivate = new ArrayList();
559                    synchronized (incrementalJobsQueue) {
560                            Iterator it = incrementalJobsQueue.iterator();
561                            while (it.hasNext()) {
562                                    IncrementalJob iJob = (IncrementalJob) it.next();
563                                    if (iJob.inputType.equals(factType)) {
564                                            toActivate.add(iJob);
565                                            logger.fine("Incremental job is sent to additional pass: "+iJob);
566                                            it.remove();
567                                    }
568                            }
569                    }
570    
571                    Iterator it = toActivate.iterator();
572                    while (it.hasNext()) {
573                            postJob((ReasoningJob) it.next());
574                    }
575            }
576            
577            // Every time job goes to execution it is assigned incremental number.
578            private int[] passCounter = {0};
579            
580            // Counts active jobs.
581            private int[] jobCounter = {0};
582    
583            /**
584             * Executes job either in the current thread or in a worker thread.
585             * @param job
586             */
587            private void postJob(final ReasoningJob job) {
588                    logger.fine("Job posted: "+job);
589                    synchronized (passCounter) {
590                            if (passCounter[0] < ++job.pass) {
591                                    passCounter[0] = job.pass;
592                            }
593                    }
594                    
595                    if (job.pass>maxPass) {
596                            throw new RulesRuntimeException("Maximum number of reasoning jobs passes ("+maxPass+") has been exceeded for job "+job+". Probably there is an infinite loop in rules logic.");
597                    }
598                    
599                    Runnable workerJob = new Runnable() {
600    
601                            public void run() {
602                                    try {
603                                            logger.fine("Running reasoning job: "+job);
604                                            job.run();
605                                    } finally {
606                                            synchronized (jobCounter) {
607                                                    int jc = --jobCounter[0];
608                                                    logger.fine("Finished running reasoning job, job counter "+jc+", " +job);
609                                                    if (jc<=0) {
610                                                            logger.fine("All reasoning jobs finished");
611                                                            jobCounter.notifyAll();
612                                                    }
613                                            }
614                                    }                                       
615                            }
616                            
617                    };
618                    
619                    synchronized (jobCounter) {
620                            ++jobCounter[0];
621                    }
622                    
623                    if (worker==null || (maxWorkerJobs>0 && jobCounter[0]>=maxWorkerJobs)) {
624                            workerJob.run();
625                    } else {                        
626                            if (!worker.post(workerJob)) {
627                                    workerJob.run();
628                            }
629                    }
630            }
631            
632            /**
633             * This thread is activate when there are no more active jobs.
634             * The thread closes pending jobs to unblock pending iterators.
635             * @author Pavel
636             *
637             */
638            private class DepletionListener extends Thread {
639                    
640                    public DepletionListener() {
641                            super("Depletion listener");
642                    }
643                    
644                    public void run() {
645                            synchronized (jobCounter) {
646                                    while (jobCounter[0]>0) {
647                                            try {
648                                                    jobCounter.wait();
649                                            } catch (InterruptedException e) {
650                                                    logger.warning("Depletion listener was interrupted");
651                                            }
652                                    }
653                    
654                                    closeIncrementalJobs();
655                                    
656                                    synchronized (jobCounter) {
657                                            depleteionListener = null;
658                                    }
659                            }
660                    }
661                    
662            }
663            
664            private DepletionListener depleteionListener;
665            
666            /**
667             * If there is exact match, returns iterator for that match, combines
668             * subclasses otherwise.
669             */
670            public Iterator getFacts(final Class factType) {
671                                    
672                    List sources = new ArrayList();
673                    Iterator it = typeEntries.values().iterator();
674                    while (it.hasNext()) {
675                            TypeEntry te = (TypeEntry) it.next();
676                            if (factType.equals(te.type)) {
677                                    return te.getAccumulator().blockingIterator();
678                            }
679                            sources.add(te);
680                    }
681                    
682                    if (sources.isEmpty()) {
683                            return new Iterator() {
684    
685                                    public boolean hasNext() {
686                                            return false;
687                                    }
688    
689                                    public Object next() {
690                                            throw new NoSuchElementException();
691                                    }
692    
693                                    public void remove() {
694                                            throw new UnsupportedOperationException();
695                                    }
696                                    
697                            };
698                    }
699                    
700                    Accumulator ret = createAccumulator(factType);
701                    it = sources.iterator();
702                    while (it.hasNext()) {
703                            TypeEntry te = (TypeEntry) it.next();
704                            postJob(new AccumulatorJob("Superclass combiner", te.type, factType, te.lazyIterator(), ret));
705                    }
706    
707                    return ret.blockingIterator();
708            }
709            
710            /**
711             * This implementation creates simple accumulator which doesn't eliminate
712             * duplicates. If several rules come to the same conclusion, these 
713             * conclusions will not be merged. Override this method to produce 
714             * accumulator which eliminates duplicates and merges conclusions. 
715             * @param producers
716             * @return
717             */
718            protected Accumulator createAccumulator(Class type) {
719                    return new Accumulator(type) {
720                            
721                            protected void onBlock() {
722                                    logger.finest("Accumulator "+this+" enters blocking state, starting anti-blocking measures");
723                                    onAccumulatorBlock();
724                            }
725                            
726                    };
727            }
728            
729            protected void onAccumulatorBlock() {
730                    synchronized (jobCounter) {
731                            if (jobCounter[0]==0) {
732                                    closeIncrementalJobs();
733                            } else {
734                                    if (depleteionListener==null) {
735                                            depleteionListener = new DepletionListener();
736                                            depleteionListener.start();
737                                    }
738                            }
739                    }                                               
740            }
741            
742            private Class[] types = {};
743            
744            // Class -> List<FactSource|Rule>
745    //      private Map typeSourcesMap = new HashMap();
746    
747            public Class[] getFactTypes() {
748                    if (started) {
749                            return types;
750                    }
751                    throw new IllegalStateException("Fact types available after container start.");
752            }
753    
754            private void closeIncrementalJobs() {
755                    logger.fine("Closing incremental jobs");
756                    synchronized (incrementalJobsQueue) {
757                            Iterator it=incrementalJobsQueue.iterator();
758                            while (it.hasNext()) {
759                                    IncrementalJob job = (IncrementalJob) it.next();
760                                    if (!job.isComplete) {
761                                            logger.fine("Incremental job closed: "+job);
762                                            job.close();
763                                    }
764                            }
765                    }
766            }
767    }