001 /* 002 * hammurapi-rules @mesopotamia.version@ 003 * Hammurapi rules engine. 004 * Copyright (C) 2005 Hammurapi Group 005 * 006 * This program is free software; you can redistribute it and/or 007 * modify it under the terms of the GNU Lesser General Public 008 * License as published by the Free Software Foundation; either 009 * version 2 of the License, or (at your option) any later version. 010 * 011 * This program is distributed in the hope that it will be useful, 012 * but WITHOUT ANY WARRANTY; without even the implied warranty of 013 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 014 * Lesser General Public License for more details. 015 * 016 * You should have received a copy of the GNU Lesser General Public 017 * License along with this library; if not, write to the Free Software 018 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 019 * 020 * URL: http://http://www.hammurapi.biz 021 * e-Mail: support@hammurapi.biz 022 */ 023 package biz.hammurapi.rules.backwardreasoning; 024 025 import java.lang.reflect.UndeclaredThrowableException; 026 import java.util.ArrayList; 027 import java.util.Collection; 028 import java.util.Collections; 029 import java.util.HashMap; 030 import java.util.Iterator; 031 import java.util.List; 032 import java.util.Map; 033 import java.util.NoSuchElementException; 034 import java.util.Set; 035 import java.util.logging.Level; 036 import java.util.logging.Logger; 037 038 import org.w3c.dom.Element; 039 import org.w3c.dom.Node; 040 041 import biz.hammurapi.config.ConfigurationException; 042 import biz.hammurapi.config.DomConfigurableContainer; 043 import biz.hammurapi.config.RuntimeConfigurationException; 044 import biz.hammurapi.dispatch.InvocationHandler; 045 import biz.hammurapi.dispatch.ResultConsumer; 046 import biz.hammurapi.rules.CollectionManager; 047 import biz.hammurapi.rules.Constants; 048 import biz.hammurapi.rules.FactSource; 049 import biz.hammurapi.rules.Rule; 050 import biz.hammurapi.rules.RulesRuntimeException; 051 import biz.hammurapi.rules.backwardreasoning.Accumulator.AccumulatorIterator; 052 import biz.hammurapi.util.ExceptionSink; 053 import biz.hammurapi.util.Worker; 054 import biz.hammurapi.xml.dom.DOMUtils; 055 056 /** 057 * Backward reasoning rules container. 058 * How to use: 059 * <OL> 060 * <LI>Instantiate from XML definition.</LI> 061 * <LI>Set collection manager if rule set contains join rules (inference methods with more than one parameter).</LI> 062 * <LI>Set worker for concurrent reasoning. (Optional)</LI> 063 * <LI>Set exception sink. Otherwise all exceptions will be reported to the logger. (Optional)</LI> 064 * <LI>Set fact sources.</LI> 065 * <LI>Invoke getFacts() for fact/conclusion type of interest.</LI> 066 * </OL> 067 * 068 * @author Pavel Vlasov 069 * @revision $Revision$ 070 */ 071 public class BackwardReasoningRulesContainer extends DomConfigurableContainer implements Constants, FactSource { 072 073 private static final Logger logger = Logger.getLogger(BackwardReasoningRulesContainer.class.getName()); 074 075 // Primarily for troubleshooting. 076 private Collection allJobs = new ArrayList(); 077 078 private static int[] jobNumberCounter = {0}; 079 080 /** 081 * Job with number. 082 */ 083 private abstract class ReasoningJob implements Runnable { 084 085 int number; 086 int pass; 087 String name; 088 boolean isComplete; 089 boolean isInProgress; 090 091 // Each job works on one input fact type. 092 protected Class inputType; 093 094 public ReasoningJob(String name, Class inputType) { 095 synchronized (jobNumberCounter) { 096 number = ++jobNumberCounter[0]; 097 } 098 this.name = name; 099 this.inputType = inputType; 100 synchronized (allJobs) { 101 allJobs.add(this); 102 } 103 logger.fine("New job: "+this); 104 } 105 106 public String toString() { 107 return "Job # "+number+" pass "+pass+" '"+name+"' working on "+inputType.getName()+" ("+getClass().getName()+")"; 108 } 109 110 } 111 112 /** 113 * Job which processes data from accumulator iterator, i.e. 114 * new items may become available in the iterator and the job 115 * needs to be repeated until iterator is closed. 116 * 117 * The job adds itself to incremental jobs queue if there are 118 * no more items and iterator hasNext() returned false. Incremental 119 * jobs are accumulated and are sent to execution after some progress 120 * has been made by other jobs. 121 * @author Pavel 122 * 123 */ 124 private abstract class IncrementalJob extends ReasoningJob { 125 126 AccumulatorIterator iterator; 127 128 IncrementalJob(String name, Class inputType, AccumulatorIterator iterator) { 129 super(name, inputType); 130 this.iterator = iterator; 131 this.inputType = inputType; 132 } 133 134 public void run() { 135 logger.fine("Running "+this.toString()+" in "+Thread.currentThread().getName()); 136 isInProgress = true; 137 while (iterator.hasNext()) { 138 doIteration(iterator.next()); 139 } 140 141 if (iterator.isClosed()) { 142 isComplete = true; 143 close(); 144 synchronized (allJobs) { 145 allJobs.remove(this); 146 logger.fine("Job finished: "+this.toString()+" in "+Thread.currentThread().getName()); 147 } 148 } else { 149 logger.fine("Job sent to next pass: "+this.toString()+" in "+Thread.currentThread().getName()); 150 queueIncrementalJob(this); 151 } 152 } 153 154 /** 155 * This method is invoked for each iterator value. 156 * If the job produced conclusions from that value, 157 * it should invoke onProgress(jobNumber, factTypes) to inform 158 * other jobs that there is additional information 159 * for them. 160 * @param next 161 */ 162 abstract void doIteration(Object next); 163 164 abstract void close(); 165 166 } 167 168 /** 169 * This class pumps data from one accumulator to another (from subtype to type). 170 * @author Pavel 171 * 172 */ 173 private class AccumulatorJob extends IncrementalJob { 174 175 Accumulator accumulator; 176 Class outputType; 177 178 AccumulatorJob(String name, Class inputType, Class outputType, AccumulatorIterator iterator, Accumulator accumulator) { 179 super(name, inputType, iterator); 180 this.accumulator = accumulator; 181 accumulator.addProducer(this); 182 this.outputType = outputType; 183 } 184 185 void close() { 186 logger.fine("Accumulator job closed: "+this); 187 accumulator.close(this); 188 } 189 190 void doIteration(Object next) { 191 if (next!=null) { 192 accumulator.add(next); 193 onProgress(this, outputType); 194 } 195 } 196 197 } 198 199 /** 200 * Reads facts from external fact source to accumulator. 201 * @author Pavel 202 */ 203 private class FactSourceJob extends ReasoningJob { 204 205 private FactSource factSource; 206 private Accumulator accumulator; 207 208 public FactSourceJob(FactSource factSource, Class inputType) { 209 super("Reads "+inputType.getName()+" external fact source "+factSource, inputType); 210 this.factSource = factSource; 211 TypeEntry te = getTypeEntry(inputType); 212 accumulator = te.addJob(this); 213 } 214 215 public void run() { 216 isInProgress = true; 217 try { 218 Iterator it = factSource.getFacts(inputType); 219 while (it.hasNext()) { 220 Object next = it.next(); 221 if (next!=null) { 222 accumulator.add(next); 223 onProgress(this, inputType); 224 } 225 } 226 } finally { 227 accumulator.close(this); 228 isComplete = true; 229 synchronized (allJobs) { 230 allJobs.remove(this); 231 } 232 } 233 } 234 } 235 236 /** 237 * Executes invocation handler 238 * @author Pavel 239 * 240 */ 241 private class InvocationHandlerJob extends IncrementalJob { 242 243 InvocationHandler invocationHandler; 244 Class[] outputTypes; 245 ResultConsumer resultConsumer; 246 247 // Class -> Accumulator 248 Map accumulatorMap = new HashMap(); 249 250 InvocationHandlerJob(String name, InvocationHandler invocationHandler) { 251 super(name, invocationHandler.getParameterType(), getTypeEntry(invocationHandler.getParameterType()).lazyIterator()); 252 this.invocationHandler = invocationHandler; 253 254 outputTypes = invocationHandler.getFactTypes(); 255 for (int i=0; i<outputTypes.length; ++i) { 256 accumulatorMap.put(outputTypes[i], getTypeEntry(outputTypes[i]).addJob(this)); 257 } 258 259 resultConsumer = new ResultConsumer() { 260 261 public void consume(Object result) { 262 for (int i=0; i<outputTypes.length; ++i) { 263 if (outputTypes[i].isInstance(result)) { 264 ((Accumulator) accumulatorMap.get(outputTypes[i])).add(result); 265 onProgress(InvocationHandlerJob.this, outputTypes[i]); 266 } 267 } 268 } 269 270 }; 271 } 272 273 void close() { 274 logger.fine("Invocation handler job closed: "+this); 275 for (int i=0; i<outputTypes.length; ++i) { 276 ((Accumulator) accumulatorMap.get(outputTypes[i])).close(this); 277 } 278 } 279 280 void doIteration(Object next) { 281 if (inputType.isInstance(next)) { 282 try { 283 invocationHandler.invoke(next, resultConsumer); 284 } catch (Exception e) { 285 if (exceptionSink==null) { 286 logger.log(Level.SEVERE, "Exception in "+this+": "+e, e); 287 } else { 288 exceptionSink.consume(this, e); 289 } 290 } catch (Throwable th) { 291 if (th instanceof Error) { 292 throw (Error) th; 293 } 294 throw new UndeclaredThrowableException(th); 295 } 296 } 297 } 298 } 299 300 // Class -> TypeEntry 301 private Map typeEntries = new HashMap(); 302 303 /** 304 * 305 * @author Pavel 306 * 307 */ 308 private class TypeEntry { 309 Class type; 310 Accumulator theAccumulator; 311 Collection thePendingJobs = new ArrayList(); 312 313 TypeEntry(Class type) { 314 this.type = type; 315 theAccumulator = createAccumulator(type); 316 } 317 318 Accumulator addJob(ReasoningJob job) { 319 thePendingJobs.add(job); 320 theAccumulator.addProducer(job); 321 return theAccumulator; 322 } 323 324 /** 325 * If there are pending jobs for the accumulator, they are posted to execution. 326 * @return Accumulator with all jobs scheduled for execution. 327 */ 328 Accumulator getAccumulator() { 329 Collection toPost = new ArrayList(); 330 synchronized (thePendingJobs) { 331 Iterator it = thePendingJobs.iterator(); 332 while (it.hasNext()) { 333 ReasoningJob pendingJob = (ReasoningJob) it.next(); 334 synchronized (pendingJob) { 335 if (pendingJob.isComplete || pendingJob.isInProgress) { 336 it.remove(); 337 } else { 338 pendingJob.isInProgress = true; 339 toPost.add(pendingJob); 340 } 341 } 342 } 343 } 344 345 Iterator it = toPost.iterator(); 346 while (it.hasNext()) { 347 postJob((ReasoningJob) it.next()); 348 } 349 return theAccumulator; 350 } 351 352 353 /** 354 * @return Iterator which activates pending jobs on first access. 355 */ 356 AccumulatorIterator lazyIterator() { 357 return new Accumulator.AccumulatorIterator() { 358 359 AccumulatorIterator master; 360 361 AccumulatorIterator getMaster() { 362 if (master==null) { 363 master = getAccumulator().iterator(); 364 } 365 return master; 366 } 367 368 public boolean isClosed() { 369 return getMaster().isClosed(); 370 } 371 372 public boolean hasNext() { 373 return getMaster().hasNext(); 374 } 375 376 public Object next() { 377 return getMaster().next(); 378 } 379 380 public void remove() { 381 getMaster().remove(); 382 } 383 384 }; 385 } 386 } 387 388 private Worker worker; 389 private Collection factSources = new ArrayList(); 390 private int maxWorkerJobs; 391 private ExceptionSink exceptionSink; 392 393 private boolean started; 394 395 /** 396 * Sets single fact source. 397 * Resets reasoning results. 398 * @param factSource 399 */ 400 public void addFactSource(FactSource factSource) { 401 if (started) { 402 throw new IllegalStateException("Fact source shall be set before container is started"); 403 } 404 factSources.add(factSource); 405 } 406 407 /** 408 * Exception sink to consume exceptions during reasoning. 409 * If exception sink is not set, exceptions are sent to logger. 410 * @param exceptionSink 411 */ 412 public void setExceptionSink(ExceptionSink exceptionSink) { 413 this.exceptionSink = exceptionSink; 414 } 415 416 /** 417 * When concurrent reasoning is used, deadlocks are possible depending on worker implementation/configuration. 418 * If excessive jobs are queued, a situation is possible when all active type jobs wait for a job in a queue to 419 * complete and that job never gets to execution. 420 * 421 * Setting maxWorkerJobs to value greater than zero will limit number of concurrent jobs posted 422 * to worker. Excessive jobs will be executed in invoking thread, to avoid 423 * being queued by the worker. 424 * @param maxWorkerJobs 425 */ 426 public void setMaxWorkerJobs(int maxWorkerJobs) { 427 this.maxWorkerJobs = maxWorkerJobs; 428 } 429 430 /** 431 * Sets multiple fact sources. 432 * Resets reasoning results. 433 * @param factSources 434 */ 435 public void addFactSources(Collection factSources) { 436 if (started) { 437 throw new IllegalStateException("Fact sources shall be set before container is started."); 438 } 439 this.factSources.addAll(factSources); 440 } 441 442 private TypeEntry getTypeEntry(Class type) { 443 TypeEntry ret = (TypeEntry) typeEntries.get(type); 444 if (ret == null) { 445 ret = new TypeEntry(type); 446 typeEntries.put(type, ret); 447 } 448 return ret; 449 } 450 451 // Maximum number of times incremental job can be invoked. 452 protected int maxPass; 453 454 public void start() throws ConfigurationException { 455 super.start(); 456 started = true; 457 Iterator fit = factSources.iterator(); 458 while (fit.hasNext()) { 459 FactSource fs = (FactSource) fit.next(); 460 Class[] fst = fs.getFactTypes(); 461 for (int i=0; i<fst.length; ++i) { 462 new FactSourceJob(fs, fst[i]); 463 } 464 } 465 466 Iterator rit = getComponents().iterator(); 467 while (rit.hasNext()) { 468 Object component = rit.next(); 469 if (component instanceof Rule) { 470 Rule rule = (Rule) component; 471 Iterator ihit = rule.getInvocationHandlers().iterator(); 472 while (ihit.hasNext()) { 473 InvocationHandler ih = (InvocationHandler) ihit.next(); 474 new InvocationHandlerJob(rule.getName(), ih); 475 } 476 } 477 } 478 479 Set typeSet = typeEntries.keySet(); 480 types = new Class[typeSet.size()]; 481 Iterator tit = typeSet.iterator(); 482 for (int i=0; tit.hasNext(); ++i) { 483 types[i] = (Class) tit.next(); 484 } 485 486 maxPass = allJobs.size()*allJobs.size()*1000; 487 488 Iterator it = allJobs.iterator(); 489 if (logger.isLoggable(Level.FINEST)) { 490 logger.finest("All jobs created"); 491 while (it.hasNext()) { 492 logger.finest("\t"+it.next()); 493 } 494 } 495 } 496 497 /** 498 * Collection manager is required for join (multi-parameter) rules. 499 * @param collectionManager 500 * @throws ConfigurationException 501 */ 502 public void setCollectionManager(CollectionManager collectionManager) throws ConfigurationException { 503 addComponent(COLLECTION_MANAGER, collectionManager); 504 } 505 506 /** 507 * Set worker for concurrent reasoning. E.g. if rules access database or 508 * network resources for different types of facts, retrieval of these facts 509 * can be done in parallel. 510 * @param worker 511 */ 512 public void setWorker(Worker worker) { 513 this.worker = worker; 514 } 515 516 public Collection getRules() { 517 return Collections.unmodifiableCollection(getComponents()); 518 } 519 520 protected String getComponentName(Node node) { 521 try { 522 return DOMUtils.getSingleNonBlankElementText((Element) node, "name"); 523 } catch (Exception e) { 524 throw new RuntimeConfigurationException("Cannot read rule name: "+e, e); 525 } 526 } 527 528 /** 529 * This method blocks until all reasoning jobs finish execution. 530 * If worker is not set, this method returns immediately. 531 * @throws InterruptedException 532 */ 533 public void join() throws InterruptedException { 534 if (worker!=null) { 535 synchronized (jobCounter) { 536 while (jobCounter[0]>0) { 537 jobCounter.wait(); 538 } 539 } 540 } 541 } 542 543 private Collection incrementalJobsQueue = new ArrayList(); 544 545 private void queueIncrementalJob(IncrementalJob job) { 546 synchronized (incrementalJobsQueue) { 547 incrementalJobsQueue.add(job); 548 incrementalJobsQueue.notifyAll(); // just for the case 549 } 550 } 551 552 /** 553 * Method to inform the reasoning system that job made progress 554 * on producing facts of given type. 555 */ 556 private void onProgress(ReasoningJob job, Class factType) { 557 logger.finest("Job "+job+" made progress on "+factType.getName()); 558 Collection toActivate = new ArrayList(); 559 synchronized (incrementalJobsQueue) { 560 Iterator it = incrementalJobsQueue.iterator(); 561 while (it.hasNext()) { 562 IncrementalJob iJob = (IncrementalJob) it.next(); 563 if (iJob.inputType.equals(factType)) { 564 toActivate.add(iJob); 565 logger.fine("Incremental job is sent to additional pass: "+iJob); 566 it.remove(); 567 } 568 } 569 } 570 571 Iterator it = toActivate.iterator(); 572 while (it.hasNext()) { 573 postJob((ReasoningJob) it.next()); 574 } 575 } 576 577 // Every time job goes to execution it is assigned incremental number. 578 private int[] passCounter = {0}; 579 580 // Counts active jobs. 581 private int[] jobCounter = {0}; 582 583 /** 584 * Executes job either in the current thread or in a worker thread. 585 * @param job 586 */ 587 private void postJob(final ReasoningJob job) { 588 logger.fine("Job posted: "+job); 589 synchronized (passCounter) { 590 if (passCounter[0] < ++job.pass) { 591 passCounter[0] = job.pass; 592 } 593 } 594 595 if (job.pass>maxPass) { 596 throw new RulesRuntimeException("Maximum number of reasoning jobs passes ("+maxPass+") has been exceeded for job "+job+". Probably there is an infinite loop in rules logic."); 597 } 598 599 Runnable workerJob = new Runnable() { 600 601 public void run() { 602 try { 603 logger.fine("Running reasoning job: "+job); 604 job.run(); 605 } finally { 606 synchronized (jobCounter) { 607 int jc = --jobCounter[0]; 608 logger.fine("Finished running reasoning job, job counter "+jc+", " +job); 609 if (jc<=0) { 610 logger.fine("All reasoning jobs finished"); 611 jobCounter.notifyAll(); 612 } 613 } 614 } 615 } 616 617 }; 618 619 synchronized (jobCounter) { 620 ++jobCounter[0]; 621 } 622 623 if (worker==null || (maxWorkerJobs>0 && jobCounter[0]>=maxWorkerJobs)) { 624 workerJob.run(); 625 } else { 626 if (!worker.post(workerJob)) { 627 workerJob.run(); 628 } 629 } 630 } 631 632 /** 633 * This thread is activate when there are no more active jobs. 634 * The thread closes pending jobs to unblock pending iterators. 635 * @author Pavel 636 * 637 */ 638 private class DepletionListener extends Thread { 639 640 public DepletionListener() { 641 super("Depletion listener"); 642 } 643 644 public void run() { 645 synchronized (jobCounter) { 646 while (jobCounter[0]>0) { 647 try { 648 jobCounter.wait(); 649 } catch (InterruptedException e) { 650 logger.warning("Depletion listener was interrupted"); 651 } 652 } 653 654 closeIncrementalJobs(); 655 656 synchronized (jobCounter) { 657 depleteionListener = null; 658 } 659 } 660 } 661 662 } 663 664 private DepletionListener depleteionListener; 665 666 /** 667 * If there is exact match, returns iterator for that match, combines 668 * subclasses otherwise. 669 */ 670 public Iterator getFacts(final Class factType) { 671 672 List sources = new ArrayList(); 673 Iterator it = typeEntries.values().iterator(); 674 while (it.hasNext()) { 675 TypeEntry te = (TypeEntry) it.next(); 676 if (factType.equals(te.type)) { 677 return te.getAccumulator().blockingIterator(); 678 } 679 sources.add(te); 680 } 681 682 if (sources.isEmpty()) { 683 return new Iterator() { 684 685 public boolean hasNext() { 686 return false; 687 } 688 689 public Object next() { 690 throw new NoSuchElementException(); 691 } 692 693 public void remove() { 694 throw new UnsupportedOperationException(); 695 } 696 697 }; 698 } 699 700 Accumulator ret = createAccumulator(factType); 701 it = sources.iterator(); 702 while (it.hasNext()) { 703 TypeEntry te = (TypeEntry) it.next(); 704 postJob(new AccumulatorJob("Superclass combiner", te.type, factType, te.lazyIterator(), ret)); 705 } 706 707 return ret.blockingIterator(); 708 } 709 710 /** 711 * This implementation creates simple accumulator which doesn't eliminate 712 * duplicates. If several rules come to the same conclusion, these 713 * conclusions will not be merged. Override this method to produce 714 * accumulator which eliminates duplicates and merges conclusions. 715 * @param producers 716 * @return 717 */ 718 protected Accumulator createAccumulator(Class type) { 719 return new Accumulator(type) { 720 721 protected void onBlock() { 722 logger.finest("Accumulator "+this+" enters blocking state, starting anti-blocking measures"); 723 onAccumulatorBlock(); 724 } 725 726 }; 727 } 728 729 protected void onAccumulatorBlock() { 730 synchronized (jobCounter) { 731 if (jobCounter[0]==0) { 732 closeIncrementalJobs(); 733 } else { 734 if (depleteionListener==null) { 735 depleteionListener = new DepletionListener(); 736 depleteionListener.start(); 737 } 738 } 739 } 740 } 741 742 private Class[] types = {}; 743 744 // Class -> List<FactSource|Rule> 745 // private Map typeSourcesMap = new HashMap(); 746 747 public Class[] getFactTypes() { 748 if (started) { 749 return types; 750 } 751 throw new IllegalStateException("Fact types available after container start."); 752 } 753 754 private void closeIncrementalJobs() { 755 logger.fine("Closing incremental jobs"); 756 synchronized (incrementalJobsQueue) { 757 Iterator it=incrementalJobsQueue.iterator(); 758 while (it.hasNext()) { 759 IncrementalJob job = (IncrementalJob) it.next(); 760 if (!job.isComplete) { 761 logger.fine("Incremental job closed: "+job); 762 job.close(); 763 } 764 } 765 } 766 } 767 }