001 /*
002 @license.text@
003 */
004 package biz.hammurapi.metrics;
005
006 import java.util.Collection;
007 import java.util.HashMap;
008 import java.util.Iterator;
009 import java.util.LinkedList;
010 import java.util.Map;
011 import java.util.Timer;
012 import java.util.TimerTask;
013
014 import biz.hammurapi.config.Component;
015 import biz.hammurapi.config.ConfigurationException;
016
017 /**
018 * Slices metrics.
019 * @author Pavel Vlasov
020 * @version $Revision: 1.3 $
021 */
022 public class SlicingMeasurementConsumer implements MeasurementConsumer, Component {
023
024 private class SliceEntry {
025 String category;
026 Slice slice;
027 /**
028 * @param category
029 * @param slice
030 */
031 SliceEntry(String category, Slice slice) {
032 super();
033 this.category = category;
034 this.slice = slice;
035 }
036 }
037
038 private Map slices=new HashMap();
039 private LinkedList sliceQueue=new LinkedList();
040 private boolean keepMeasurements=false;
041 private long tick=60000;
042 private int maxQueue=1000;
043 private SliceConsumer sliceConsumer=new ConsoleSliceConsumer();
044 private Timer timer;
045 private boolean isOwnTimer;
046
047 protected SliceConsumer getSliceConsumer() {
048 return sliceConsumer;
049 }
050
051 /**
052 * Creates a new instance with internal timer.
053 * @param tick Slice size in milliseconds
054 * @param keepMeasurements If true individual measurements are reported, only aggregated values otherwise
055 * @param maxQueue Maximum number of slices pending to be consumed. 0 - no limit. If sampling ratio is higher than
056 * consuming ration then excessive slices will be dropped with a notice on console.
057 */
058 public SlicingMeasurementConsumer(long tick, boolean keepMeasurements, int maxQueue, SliceConsumer sliceConsumer) {
059 this(tick, keepMeasurements, maxQueue, sliceConsumer, null);
060 }
061
062 /**
063 * Creates a new instance with internal timer.
064 * @param tick Slice size in milliseconds
065 * @param keepMeasurements If true individual measurements are reported, only aggregated values otherwise
066 * @param maxQueue Maximum number of slices pending to be consumed. 0 - no limit. If sampling ratio is higher than
067 * consuming ration then excessive slices will be dropped with a notice on console.
068 * @param timer Timer to use for slicing metrics and passing them to slice consumer. If it is null then an internal timer is
069 * created.
070 */
071 public SlicingMeasurementConsumer(long tick, boolean keepMeasurements, int maxQueue, SliceConsumer sliceConsumer, Timer timer) {
072 super();
073 this.keepMeasurements = keepMeasurements;
074 this.tick=tick;
075 this.maxQueue=maxQueue;
076 this.sliceConsumer=sliceConsumer;
077 this.timer=timer;
078 }
079
080 /**
081 * Default constructor with default settings.
082 */
083 public SlicingMeasurementConsumer() {
084 super();
085 }
086
087 public void addMeasurement(String name, double value, long time) {
088 synchronized (slices) {
089 Slice slice=(Slice) slices.get(name);
090 if (slice==null) {
091 slice=new SimpleSlice(name, keepMeasurements);
092 slices.put(name, slice);
093 }
094 slice.add(value, time);
095
096 if (slice.getTo()-slice.getFrom()>=tick) {
097 slices.remove(name);
098 addSliceToQueue(null, slice);
099 }
100 }
101 }
102
103 private Map instances=new HashMap();
104
105 private class CategorizedConsumer implements MeasurementConsumer, Component {
106
107
108 String category;
109 Map slices=new HashMap();
110
111 /**
112 * @param category
113 */
114 public CategorizedConsumer(String category) {
115 super();
116 this.category = category;
117 }
118
119 public void addMeasurement(String name, double value, long time) {
120 synchronized (slices) {
121 Slice slice=(Slice) slices.get(name);
122 if (slice==null) {
123 slice=new SimpleSlice(name, keepMeasurements);
124 slices.put(name, slice);
125 }
126 slice.add(value, time);
127
128 if (slice.getTo()-slice.getFrom()>=tick) {
129 slices.remove(name);
130 addSliceToQueue(category, slice);
131 }
132 }
133 }
134
135 public void start() throws ConfigurationException {
136 SlicingMeasurementConsumer.this.start();
137 }
138
139 public void stop() throws ConfigurationException {
140 SlicingMeasurementConsumer.this.stop();
141 }
142
143 public void setOwner(Object owner) {
144 // Ignore
145 }
146 }
147
148 /**
149 * @param category
150 * @return Instance for a category.
151 */
152 public MeasurementConsumer getCategoryInstance(final String category) {
153 synchronized (instances) {
154 MeasurementConsumer ret = (MeasurementConsumer) instances.get(category);
155 if (ret==null) {
156 ret = new CategorizedConsumer(category);
157 instances.put(category, ret);
158 }
159 return ret;
160 }
161 }
162
163 private int droppedCounter;
164 private long firstDropped;
165 private Thread slicingThread;
166
167 public void shutdown() {
168 to=System.currentTimeMillis();
169
170 synchronized (slices) {
171 Iterator it=slices.values().iterator();
172 while (it.hasNext()) {
173 Slice slice=(Slice) it.next();
174 it.remove();
175 addSliceToQueue(null, slice);
176 }
177 }
178
179 synchronized (instances) {
180 Iterator iit=instances.values().iterator();
181 while (iit.hasNext()) {
182 CategorizedConsumer cConsumer=(CategorizedConsumer) iit.next();
183 synchronized (cConsumer.slices) {
184 Iterator it=cConsumer.slices.values().iterator();
185 while (it.hasNext()) {
186 Slice slice=(Slice) it.next();
187 it.remove();
188 addSliceToQueue(cConsumer.category, slice);
189 }
190 }
191 }
192 }
193
194 // MeasurementCategoryFactory.unregister(this); - Unregister explicitly!!!
195 addSliceToQueue(null,null);
196 try {
197 slicingThread.join();
198 } catch (InterruptedException e) {
199 throw new MetricsException(e);
200 }
201
202 // Stop the time is it is our own.
203 if (isOwnTimer) {
204 timer.cancel();
205 }
206 }
207
208 /**
209 *
210 */
211 private void addSliceToQueue(String category, final Slice slice) {
212 synchronized (sliceQueue) {
213 if (slice!=null && maxQueue!=0 && sliceQueue.size()>maxQueue) {
214 firstDropped=slice.getTo();
215 droppedCounter++;
216 } else {
217 if (droppedCounter>0) {
218 sliceQueue.add(
219 new SliceEntry(
220 "DroppedSlices",
221 new Slice() {
222
223 public long getFrom() {
224 return firstDropped;
225 }
226
227 public long getTo() {
228 return slice.getTo();
229 }
230
231 public int getNumber() {
232 return droppedCounter;
233 }
234
235 public double getMin() {
236 return 0;
237 }
238
239 public double getMax() {
240 return 0;
241 }
242
243 public double getAvg() {
244 return 0;
245 }
246
247 public double getTotal() {
248 return 0;
249 }
250
251 public void add(double value, long time) {
252 throw new UnsupportedOperationException();
253 }
254
255 public void add(Metric metric) {
256 throw new UnsupportedOperationException();
257 }
258
259 public Collection getMeasurements() {
260 return null;
261 }
262
263 public String getName() {
264 return "DROPPED SLICES";
265 }
266
267 public double getDeviation() {
268 return 0;
269 }
270
271 }));
272 droppedCounter=0;
273 }
274
275 sliceQueue.add(slice==null ? null : new SliceEntry(category, slice));
276 sliceQueue.notifyAll();
277 }
278 }
279 }
280
281 /**
282 *
283 */
284 private void onTick() {
285 long now=System.currentTimeMillis();
286
287 synchronized (slices) {
288 Iterator it=slices.values().iterator();
289 while (it.hasNext()) {
290 Slice slice=(Slice) it.next();
291 if (now-slice.getFrom()>=tick) {
292 it.remove();
293 addSliceToQueue(null, slice);
294 }
295 }
296 }
297
298 synchronized (instances) {
299 Iterator iit=instances.values().iterator();
300 while (iit.hasNext()) {
301 CategorizedConsumer cConsumer=(CategorizedConsumer) iit.next();
302 synchronized (cConsumer.slices) {
303 Iterator it=cConsumer.slices.values().iterator();
304 while (it.hasNext()) {
305 Slice slice=(Slice) it.next();
306 if (now-slice.getFrom()>=tick) {
307 it.remove();
308 addSliceToQueue(cConsumer.category, slice);
309 }
310 }
311 }
312 }
313 }
314
315 if (sliceConsumer instanceof HousekeepingSliceConsumer) {
316 ((HousekeepingSliceConsumer) sliceConsumer).onTick(now);
317 }
318 }
319
320 {
321 slicingThread=new Thread() {
322 {
323 setName("Slice queue processor");
324 setDaemon(true);
325 setPriority(Thread.MIN_PRIORITY);
326 start();
327 }
328
329 private SliceEntry getSliceEntry() throws InterruptedException {
330 synchronized (sliceQueue) {
331 while (sliceQueue.isEmpty()) {
332 sliceQueue.wait();
333 }
334
335 return (SliceEntry) sliceQueue.removeFirst();
336 }
337 }
338
339 public void run() {
340 while (true) {
341 try {
342 SliceEntry entry=getSliceEntry();
343 if (entry==null) {
344 if (sliceConsumer instanceof Component) {
345 try {
346 ((Component) sliceConsumer).stop();
347 } catch (ConfigurationException e1) {
348 e1.printStackTrace();
349 }
350 }
351 return;
352 }
353
354 // Put entry back to queue if slice consumer is unable to consume
355 // and sleep.
356 if (!sliceConsumer.consumeSlice(entry.category, entry.slice)) {
357 synchronized (sliceQueue) {
358 sliceQueue.add(entry);
359 }
360 sleep(tick);
361 }
362 } catch (InterruptedException e) {
363 return;
364 }
365 }
366 }
367 };
368 }
369
370 private int useCounter;
371
372 protected long from;
373 protected long to;
374
375 private TimerTask tickTask;
376
377 /**
378 * Increments use counter
379 */
380 public void start() throws ConfigurationException {
381 if (timer==null) {
382 timer=new Timer(true);
383 isOwnTimer=true;
384 }
385
386 tickTask = new TimerTask() {
387 public void run() {
388 onTick();
389 }
390 };
391
392 timer.scheduleAtFixedRate(tickTask, tick, tick);
393
394 if (useCounter==0) {
395 from=System.currentTimeMillis();
396 if (sliceConsumer instanceof Component) {
397 ((Component) sliceConsumer).start();
398 }
399 }
400
401 ++useCounter;
402 }
403
404 /**
405 * Decrements use counter and invokes shutdown() when counter==0
406 */
407 public void stop() throws ConfigurationException {
408 if (--useCounter==0) {
409 shutdown();
410 }
411 }
412
413 public void setOwner(Object owner) {
414 // Ignore
415 }
416
417 public int getMaxQueue() {
418 return maxQueue;
419 }
420
421 public void setMaxQueue(int maxQueue) {
422 this.maxQueue = maxQueue;
423 }
424
425 public long getTick() {
426 return tick;
427 }
428
429 public void setTick(long tick) {
430 this.tick = tick;
431 }
432
433 public void setSliceConsumer(SliceConsumer sliceConsumer) {
434 this.sliceConsumer = sliceConsumer;
435 }
436
437 public boolean isKeepMeasurements() {
438 return keepMeasurements;
439 }
440
441 public void setKeepMeasurements(boolean keepMeasurements) {
442 this.keepMeasurements = keepMeasurements;
443 }
444
445
446 }