001 package biz.hammurapi.metrics;
002
003 import java.io.Serializable;
004 import java.util.LinkedList;
005 import java.util.List;
006 import java.util.Timer;
007 import java.util.TimerTask;
008
009 import biz.hammurapi.config.Component;
010 import biz.hammurapi.config.ConfigurationException;
011 import biz.hammurapi.metrics.SimpleSlice;
012 import biz.hammurapi.metrics.Slice;
013 import biz.hammurapi.metrics.SliceConsumer;
014
015
016 /**
017 * Collects slices for a period of time and processes them in batches.
018 * @author Pavel Vlasov
019 */
020 public abstract class BatchingSliceConsumer implements SliceConsumer, Component {
021 private static final int MAX_SLICES = 10000; // maximum number of slices to keep in the send queue
022
023 private int maxSlices = MAX_SLICES;
024 private long interval = 60000; // Default batching period is one minute.
025
026 /**
027 * Sets maximum number of slices to be retained if processing fails.
028 * @param maxSlices
029 */
030 public void setMaxSlices(int maxSlices) {
031 this.maxSlices = maxSlices;
032 }
033
034 /**
035 * Sets batching interval. Default interval is one minute.
036 * @param interval
037 */
038 public void setInterval(long interval) {
039 this.interval = interval;
040 }
041
042 private LinkedList slices = new LinkedList();
043
044 private static final Timer timer = new Timer(true);
045 private Thread shutdownHook;
046
047 private TimerTask task;
048
049 /**
050 * Helper class to serialize accumulated slices.
051 * @author Pavel Vlasov
052 */
053 public static class SliceEntry implements Serializable {
054 String category;
055 Slice slice;
056
057 SliceEntry(String category, Slice slice) {
058 this.category = category;
059 if (slice instanceof SimpleSlice) {
060 this.slice = slice;
061 } else {
062 this.slice = new SimpleSlice(slice);
063 }
064 }
065
066 public String getCategory() {
067 return category;
068 }
069
070 public Slice getSlice() {
071 return slice;
072 }
073 }
074
075 public synchronized boolean consumeSlice(String category, Slice slice) {
076 slices.add(new SliceEntry(category, slice));
077 return true;
078 }
079
080 public void setOwner(Object owner) {
081 // Nothing
082
083 }
084
085 public void start() throws ConfigurationException {
086 if (interval<=0) {
087 throw new ConfigurationException("Invalid interval: "+interval);
088 }
089 task = new TimerTask() {
090
091 public void run() {
092 LinkedList slicesToProcess;
093 synchronized (BatchingSliceConsumer.this) {
094 if (slices.isEmpty()) {
095 return;
096 }
097 slicesToProcess = slices;
098 slices = new LinkedList();
099 }
100
101 if (!processSlices(slicesToProcess)) {
102 synchronized (BatchingSliceConsumer.this) {
103 slicesToProcess.addAll(slices);
104 slices = slicesToProcess;
105 while (slicesToProcess.size()>maxSlices) {
106 slicesToProcess.removeFirst();
107 }
108 }
109 }
110 }
111
112 };
113 timer.schedule(task, interval, interval);
114 shutdownHook = new Thread(task);
115 Runtime.getRuntime().addShutdownHook(shutdownHook);
116 }
117
118 public void stop() throws ConfigurationException {
119 if (task!=null) {
120 task.cancel();
121 if (shutdownHook!=null) {
122 try {
123 Runtime.getRuntime().removeShutdownHook(shutdownHook);
124 } catch (Exception e) {
125 // We don't care
126 }
127 }
128 task.run();
129 }
130 }
131
132 /**
133 * Subclasses shall implement this method.
134 * @param slices
135 * @return true if slices was successfully processed.
136 */
137 protected abstract boolean processSlices(List slices);
138
139 }