001 package biz.hammurapi.metrics;
002
003 import java.io.IOException;
004 import java.io.ObjectInputStream;
005 import java.net.InetAddress;
006 import java.net.ServerSocket;
007 import java.net.Socket;
008 import java.util.Iterator;
009 import java.util.List;
010
011 import biz.hammurapi.config.Component;
012 import biz.hammurapi.config.ConfigurationException;
013 import biz.hammurapi.metrics.ConsoleSliceConsumer;
014 import biz.hammurapi.metrics.SliceConsumer;
015 import biz.hammurapi.metrics.BatchingSliceConsumer.SliceEntry;
016
017
018 public class SocketSliceConsumerServer implements Component {
019
020 private SliceConsumer delegate = new ConsoleSliceConsumer();
021
022 private int port = SocketSliceConsumer.DEFAULT_PORT;
023
024 /**
025 * Set server listening port
026 * @param port
027 */
028 public void setPort(int port) {
029 this.port = port;
030 }
031
032 ServerSocket serverSocket;
033
034 public void setOwner(Object owner) {
035 // Nothing
036
037 }
038
039 public void start() throws ConfigurationException {
040 try {
041 serverSocket = new ServerSocket(port);
042 Thread listeningThread = new Thread() {
043 public void run() {
044 while (true) {
045 try {
046 process(serverSocket.accept());
047 } catch (IOException e) {
048 consumeException(e);
049 return;
050 }
051 }
052 }
053 };
054 listeningThread.setDaemon(true);
055 listeningThread.setName("Slice consumer listener");
056 listeningThread.start();
057 } catch (IOException e) {
058 throw new ConfigurationException("Could not create server socket: "+e,e);
059 }
060 }
061
062 public void stop() throws ConfigurationException {
063 if (serverSocket!=null) {
064 try {
065 serverSocket.close();
066 } catch (IOException e) {
067 throw new ConfigurationException("Could not close server socket: "+e, e);
068 }
069 }
070 }
071
072 /**
073 * @return SliceConsumer to delegate processing. This class returns ConsoleSliceConsumer.
074 * Subclasses can override this method.
075 */
076 protected SliceConsumer getDelegate() {
077 return delegate;
078 }
079
080 /**
081 * This method iterates over slices in the batch
082 * and sends them to the delegate SliceConsumer returned
083 * by getDelegate() method.
084 * @param slices List of slice entries obtained from clients
085 */
086 protected void processSlices(List slices, InetAddress address, String id) {
087 Iterator it = slices.iterator();
088 while (it.hasNext()) {
089 SliceEntry se = (SliceEntry) it.next();
090 getDelegate().consumeSlice("["+address+":"+id+"] "+se.getCategory(), se.getSlice());
091 }
092 }
093
094 /**
095 * Spawns a new thread for each socket
096 * @param socket
097 */
098 protected void process(final Socket socket) {
099 Runnable job = new Runnable() {
100 public void run() {
101 try {
102 ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
103 try {
104 String id = (String) ois.readObject();
105 processSlices((List) ois.readObject(), socket.getInetAddress(), id);
106 } finally {
107 ois.close();
108 socket.close();
109 }
110 } catch (Exception e) {
111 consumeException(e);
112 }
113 }
114 };
115 process(job);
116 }
117
118 /**
119 * Spawns a new thread for each job.
120 * Override to use thread pools on high loads.
121 * @param job
122 */
123 protected void process(Runnable job) {
124 new Thread(job, "Slice consumer").start();
125 }
126
127 /**
128 * Consumes exceptions in processing threads.
129 * @param e
130 */
131 protected void consumeException(Exception e) {
132 e.printStackTrace();
133 }
134
135 public static void main(String[] args) throws ConfigurationException {
136 System.out.println("Usage: java [options] biz.hammurapi.metrics.SocketSliceConsumerServer [port]");
137 int port = SocketSliceConsumer.DEFAULT_PORT;
138 System.out.println("Default port: "+port);
139 if (args.length>0) {
140 try {
141 port = Integer.parseInt(args[0]);
142 } catch (NumberFormatException e) {
143 System.err.println("Invalid port number: "+args[0]);
144 System.exit(1);
145 }
146 }
147
148 SocketSliceConsumerServer server = new SocketSliceConsumerServer();
149 server.setPort(port);
150 server.start();
151 }
152
153 }