001    package biz.hammurapi.jms;
002    
003    import java.util.logging.Level;
004    import java.util.logging.Logger;
005    
006    import javax.jms.Connection;
007    import javax.jms.ConnectionFactory;
008    import javax.jms.Destination;
009    import javax.jms.ExceptionListener;
010    import javax.jms.JMSException;
011    import javax.jms.Message;
012    import javax.jms.MessageListener;
013    import javax.jms.Session;
014    import javax.naming.Context;
015    import javax.naming.NamingException;
016    
017    import biz.hammurapi.config.Component;
018    import biz.hammurapi.config.ComponentBase;
019    import biz.hammurapi.config.ConfigurationException;
020    import biz.hammurapi.config.RestartCommand;
021    import biz.hammurapi.config.Restartable;
022    import biz.hammurapi.config.RuntimeConfigurationException;
023    import biz.hammurapi.metrics.MeasurementCollector;
024    import biz.hammurapi.metrics.MeasurementConsumer;
025    import biz.hammurapi.util.Worker;
026    
027    /**
028     * Base class for JMS message processors.
029     * @author Pavel Vlasov
030     */
031    public abstract class MessageProcessor extends ComponentBase implements Restartable {
032            
033            private static final String INITIAL_CONTEXT_PREFIX = "initialContext/";
034            private static final Logger logger = java.util.logging.Logger.getLogger(MessageProcessor.class.getName());
035                            
036            protected Context initialContext;
037            private String connectionFactoryName;
038            private String destinationName;
039            private int listeners = Runtime.getRuntime().availableProcessors();
040            
041            /**
042             * Number of message listening threads. Defaults to number of processes available to JVM.
043             * @param listeners
044             */
045            public void setListeners(int listeners) {
046                    this.listeners = listeners;
047            }
048            
049            public void setInitialContext(Context initialContext) {
050                    this.initialContext = initialContext;
051            }
052            
053            /**
054             * Request destination (queue or topic) name
055             * @param destinationName
056             */
057            public void setDestination(String destinationName) {
058                    this.destinationName = destinationName;
059            }
060            
061            /**
062             * JMS connection name
063             * @param connectionName
064             */
065            public void setConnectionFactory(String connectionFactoryName) {
066                    this.connectionFactoryName = connectionFactoryName;
067            }
068            
069            /**
070             * Worker to process requests. It is optional. If worker is not set or cannot process requests,
071             * requests are processed in the message listener thread.
072             * @param worker
073             */
074            public void setWorker(Worker worker) {
075                    this.worker = worker;
076            }
077            
078            protected Connection connection;
079            private Destination destination;
080            protected boolean isTransacted;
081            protected int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
082    
083            /**
084             * Acknoledge mode. Valid values: AUTO (default), CLIENT, DUPS_OK.
085             * @param acknowledgeModeName
086             */
087            public void setAcknowledgeMode(String acknowledgeModeName) {
088                    try {
089                            this.acknowledgeMode = Session.class.getField(acknowledgeModeName+"_ACKNOWLEDGE").getInt(null);
090                    } catch (Exception e) {
091                            throw new IllegalArgumentException("Invalid acknowledge mode '"+acknowledgeModeName+"': "+e);
092                    }
093            }
094            
095            /**
096             * Explicitly sets connection. This method is useful when several components share one connection.
097             * @param connection
098             */
099            public void setConnection(Connection connection) {
100                    this.connection = connection;
101            }
102            
103            /**
104             * Transactional attribute of JMS Session.
105             * @param isTransacted
106             */
107            public void setTransacted(boolean isTransacted) {
108                    this.isTransacted = isTransacted;
109            }
110            
111            protected Worker worker;        
112            
113            private String user;
114            private String pwd;
115            protected ConnectionFactory connectionFactory;
116            
117            /** 
118             * JMS Connection user name. Optional.
119             * @param user
120             */
121            public void setUser(String user) {
122                    this.user = user;
123            }
124            
125            /**
126             * JMS Connection password. Optional.
127             * @param pwd
128             */
129            public void setPassword(String pwd) {
130                    this.pwd = pwd;
131            }
132    
133            /**
134             * Message listener class.
135             * @author Pavel Vlasov
136             */
137            private class ProcessingMessageListener implements MessageListener {
138                    
139                    private Session session;
140    
141                    public ProcessingMessageListener(Session session) {
142                            this.session = session;
143                    }               
144                    
145                    public synchronized void onMessage(final Message request) {
146                            logger.fine("Message received");
147                            addMeasurement("receive", 1, System.currentTimeMillis());
148                            
149                            if (worker==null) {
150                                    _processMessage(request, session);
151                            } else {
152                                    Runnable job = new Runnable() {
153    
154                                            public void run() {
155                                                    _processMessage(request, null);
156                                            }
157                                            
158                                    };
159                                    
160                                    // Try to post job to worker. process in the current thread if worker doesn't accept it.
161                                    if (!worker.post(job)) {
162                                            _processMessage(request, session);
163                                    }
164                            }                                               
165                    }
166                                    
167            };
168    
169            public void start() throws ConfigurationException {
170                    try {                   
171                            logger.info("Starting ...");
172                            
173                            connectionFactory = (ConnectionFactory) initialContext.lookup(connectionFactoryName);
174                            if (user==null) {
175                                    connection = connectionFactory.createConnection();
176                            } else {
177                                    connection = connectionFactory.createConnection(user, pwd);
178                            }
179                            connection.start();
180                            
181                            final ExceptionListener pel = connection.getExceptionListener();
182                            connection.setExceptionListener(new ExceptionListener() {
183    
184                                    public void onException(JMSException e) {
185                                            if (pel!=null) {
186                                                    pel.onException(e);
187                                            }
188                                                                                    
189                                            logger.log(Level.SEVERE, "JMS connection exception: " + e, e);
190                                            
191                                            if (restartCommand!=null) {
192                                                    try {
193                                                            try {
194                                                                    connection.setExceptionListener(pel); // To avoid double firing if there is another exception in stop().
195                                                                    stop();
196                                                            } catch (Exception ex) {
197                                                                    ex.printStackTrace();
198                                                                    logger.log(Level.SEVERE, "Cannot stop the processor: "+ex, ex);
199                                                            }
200                                                    } finally {
201                                                            new Thread(restartCommand, "Connection restart thread "+restartCommand.getAttempt()).start();
202                                                    }
203                                            }
204                                            
205                                    }
206                                    
207                            });
208                            
209                            destination = (Destination) initialContext.lookup(destinationName);
210                            
211                            if (worker instanceof Component) {
212                                    ((Component) worker).start();
213                            }
214                            
215                            if (worker instanceof MeasurementCollector) {
216                                    ((MeasurementCollector) worker).setMeasurementConsumer(
217                                                    new MeasurementConsumer() {
218    
219                                                            public void addMeasurement(String name, double value, long time) {
220                                                                    MessageProcessor.this.addMeasurement("worker."+name, value, time);                                                              
221                                                            }
222                                                            
223                                                    });
224                            }                       
225                            
226                            for (int i=0; i<listeners; ++i) {
227                                    final Session session = borrowSession();
228                                    session.createConsumer(destination, messageSelector, noLocal).setMessageListener(new ProcessingMessageListener(session));
229                            }
230    
231                            logger.info("Started");
232                    } catch (Exception e) {
233                            if (e instanceof ConfigurationException) {
234                                    throw (ConfigurationException) e;
235                            }
236                            
237                            throw new ConfigurationException("Could not start message processor: "+e, e);
238                    }               
239            }
240            
241            /**
242             * Processes request message
243             * @param request Request message
244             * @param session Session if message is processed in the message listener thread (worker is null or cannot process jobs), null otherwise.
245             */
246            protected abstract void processMessage(Message request, Session session);
247            
248            protected void _processMessage(Message request, Session session) {
249                    long start = System.currentTimeMillis();
250                    try {
251                            processMessage(request, session);
252                    } finally {
253                            long now = System.currentTimeMillis();
254                            addMeasurement("process", now - start, now);
255                    }
256            }
257            
258            /**
259             * Stops worker (thread pool), if any, and connection.
260             */
261            public void stop() throws ConfigurationException {
262                    try {                   
263                            logger.info("Stopping ...");
264                            
265                            // Stop worker
266                            if (worker instanceof Component) {
267                                    ((Component) worker).stop();
268                            }
269                                                    
270                            // No need to close individual sessions.
271                            if (connection!=null) {
272                                    connection.close();
273                            }               
274                            
275                            logger.info("Stopped");
276                    } catch (Exception e) {
277                            if (e instanceof ConfigurationException) {
278                                    throw (ConfigurationException) e;
279                            }
280                            
281                            throw new ConfigurationException("Could not stop message processor");
282                    }
283            }
284            
285            /**
286             * This implementation simply creates a new session.
287             * Subclasses can override this method to implement session pooling.
288             * @return
289             * @throws JMSException 
290             */
291            protected Session borrowSession() throws JMSException {
292                    return connection.createSession(isTransacted, acknowledgeMode);
293            }
294    
295            /**
296             * This implementation simply closes the session.
297             * Subclasses can override this method to implement session pooling.
298             * @throws JMSException 
299             */
300            protected void releaseSession(Session session) throws JMSException {
301                    session.close();
302            }
303    
304            /**
305             * Provides access to destination, initial context, connection, and worker.
306             * Bridges to initial context. For names in a form initialContext/&lt;name&gt; the name is looked up in initial context.
307             */
308            protected Object getChild(String name) {
309                    if ("destination".equals(name)) {
310                            return destination;
311                    }
312                    
313                    if ("initialContext".equals(name)) {
314                            return initialContext;
315                    }
316                    
317                    if (name!=null && name.startsWith(INITIAL_CONTEXT_PREFIX)) {
318                            String jndiName = name.substring(INITIAL_CONTEXT_PREFIX.length());
319                            try {
320                                    return initialContext.lookup(jndiName);
321                            } catch (NamingException e) {
322                                    throw new RuntimeConfigurationException("Lookup in inital context failed for JNDI name '"+jndiName+"':"+e, e);
323                            }
324                    }
325                    
326                    if ("connection".equals(name)) {
327                            return connection;
328                    }
329                    
330                    if ("connectionFactory".equals(name)) {
331                            return connectionFactory;
332                    }
333                    
334                    if ("worker".equals(name)) {
335                            return worker;
336                    }               
337                    
338                    return super.getChild(name);
339            }
340            
341            private String messageSelector;
342            
343            private boolean noLocal;
344            
345            /**
346             * Message selector
347             * @param messageSelector
348             */
349            public void setMessageSelector(String messageSelector) {
350                    this.messageSelector = messageSelector;
351            }
352            
353            /**
354             * If true and destination is topic then messages produced by this connection are not consumed by the message processor.
355             * @param noLocal
356             */
357            public void setNoLocal(boolean noLocal) {
358                    this.noLocal = noLocal;
359            }
360            
361            protected RestartCommand restartCommand;
362                    
363            public void setRestartCommand(RestartCommand command) {
364                    this.restartCommand = command;          
365            }
366    }