001 package biz.hammurapi.jms; 002 003 import java.util.logging.Level; 004 import java.util.logging.Logger; 005 006 import javax.jms.Connection; 007 import javax.jms.ConnectionFactory; 008 import javax.jms.Destination; 009 import javax.jms.ExceptionListener; 010 import javax.jms.JMSException; 011 import javax.jms.Message; 012 import javax.jms.MessageListener; 013 import javax.jms.Session; 014 import javax.naming.Context; 015 import javax.naming.NamingException; 016 017 import biz.hammurapi.config.Component; 018 import biz.hammurapi.config.ComponentBase; 019 import biz.hammurapi.config.ConfigurationException; 020 import biz.hammurapi.config.RestartCommand; 021 import biz.hammurapi.config.Restartable; 022 import biz.hammurapi.config.RuntimeConfigurationException; 023 import biz.hammurapi.metrics.MeasurementCollector; 024 import biz.hammurapi.metrics.MeasurementConsumer; 025 import biz.hammurapi.util.Worker; 026 027 /** 028 * Base class for JMS message processors. 029 * @author Pavel Vlasov 030 */ 031 public abstract class MessageProcessor extends ComponentBase implements Restartable { 032 033 private static final String INITIAL_CONTEXT_PREFIX = "initialContext/"; 034 private static final Logger logger = java.util.logging.Logger.getLogger(MessageProcessor.class.getName()); 035 036 protected Context initialContext; 037 private String connectionFactoryName; 038 private String destinationName; 039 private int listeners = Runtime.getRuntime().availableProcessors(); 040 041 /** 042 * Number of message listening threads. Defaults to number of processes available to JVM. 043 * @param listeners 044 */ 045 public void setListeners(int listeners) { 046 this.listeners = listeners; 047 } 048 049 public void setInitialContext(Context initialContext) { 050 this.initialContext = initialContext; 051 } 052 053 /** 054 * Request destination (queue or topic) name 055 * @param destinationName 056 */ 057 public void setDestination(String destinationName) { 058 this.destinationName = destinationName; 059 } 060 061 /** 062 * JMS connection name 063 * @param connectionName 064 */ 065 public void setConnectionFactory(String connectionFactoryName) { 066 this.connectionFactoryName = connectionFactoryName; 067 } 068 069 /** 070 * Worker to process requests. It is optional. If worker is not set or cannot process requests, 071 * requests are processed in the message listener thread. 072 * @param worker 073 */ 074 public void setWorker(Worker worker) { 075 this.worker = worker; 076 } 077 078 protected Connection connection; 079 private Destination destination; 080 protected boolean isTransacted; 081 protected int acknowledgeMode = Session.AUTO_ACKNOWLEDGE; 082 083 /** 084 * Acknoledge mode. Valid values: AUTO (default), CLIENT, DUPS_OK. 085 * @param acknowledgeModeName 086 */ 087 public void setAcknowledgeMode(String acknowledgeModeName) { 088 try { 089 this.acknowledgeMode = Session.class.getField(acknowledgeModeName+"_ACKNOWLEDGE").getInt(null); 090 } catch (Exception e) { 091 throw new IllegalArgumentException("Invalid acknowledge mode '"+acknowledgeModeName+"': "+e); 092 } 093 } 094 095 /** 096 * Explicitly sets connection. This method is useful when several components share one connection. 097 * @param connection 098 */ 099 public void setConnection(Connection connection) { 100 this.connection = connection; 101 } 102 103 /** 104 * Transactional attribute of JMS Session. 105 * @param isTransacted 106 */ 107 public void setTransacted(boolean isTransacted) { 108 this.isTransacted = isTransacted; 109 } 110 111 protected Worker worker; 112 113 private String user; 114 private String pwd; 115 protected ConnectionFactory connectionFactory; 116 117 /** 118 * JMS Connection user name. Optional. 119 * @param user 120 */ 121 public void setUser(String user) { 122 this.user = user; 123 } 124 125 /** 126 * JMS Connection password. Optional. 127 * @param pwd 128 */ 129 public void setPassword(String pwd) { 130 this.pwd = pwd; 131 } 132 133 /** 134 * Message listener class. 135 * @author Pavel Vlasov 136 */ 137 private class ProcessingMessageListener implements MessageListener { 138 139 private Session session; 140 141 public ProcessingMessageListener(Session session) { 142 this.session = session; 143 } 144 145 public synchronized void onMessage(final Message request) { 146 logger.fine("Message received"); 147 addMeasurement("receive", 1, System.currentTimeMillis()); 148 149 if (worker==null) { 150 _processMessage(request, session); 151 } else { 152 Runnable job = new Runnable() { 153 154 public void run() { 155 _processMessage(request, null); 156 } 157 158 }; 159 160 // Try to post job to worker. process in the current thread if worker doesn't accept it. 161 if (!worker.post(job)) { 162 _processMessage(request, session); 163 } 164 } 165 } 166 167 }; 168 169 public void start() throws ConfigurationException { 170 try { 171 logger.info("Starting ..."); 172 173 connectionFactory = (ConnectionFactory) initialContext.lookup(connectionFactoryName); 174 if (user==null) { 175 connection = connectionFactory.createConnection(); 176 } else { 177 connection = connectionFactory.createConnection(user, pwd); 178 } 179 connection.start(); 180 181 final ExceptionListener pel = connection.getExceptionListener(); 182 connection.setExceptionListener(new ExceptionListener() { 183 184 public void onException(JMSException e) { 185 if (pel!=null) { 186 pel.onException(e); 187 } 188 189 logger.log(Level.SEVERE, "JMS connection exception: " + e, e); 190 191 if (restartCommand!=null) { 192 try { 193 try { 194 connection.setExceptionListener(pel); // To avoid double firing if there is another exception in stop(). 195 stop(); 196 } catch (Exception ex) { 197 ex.printStackTrace(); 198 logger.log(Level.SEVERE, "Cannot stop the processor: "+ex, ex); 199 } 200 } finally { 201 new Thread(restartCommand, "Connection restart thread "+restartCommand.getAttempt()).start(); 202 } 203 } 204 205 } 206 207 }); 208 209 destination = (Destination) initialContext.lookup(destinationName); 210 211 if (worker instanceof Component) { 212 ((Component) worker).start(); 213 } 214 215 if (worker instanceof MeasurementCollector) { 216 ((MeasurementCollector) worker).setMeasurementConsumer( 217 new MeasurementConsumer() { 218 219 public void addMeasurement(String name, double value, long time) { 220 MessageProcessor.this.addMeasurement("worker."+name, value, time); 221 } 222 223 }); 224 } 225 226 for (int i=0; i<listeners; ++i) { 227 final Session session = borrowSession(); 228 session.createConsumer(destination, messageSelector, noLocal).setMessageListener(new ProcessingMessageListener(session)); 229 } 230 231 logger.info("Started"); 232 } catch (Exception e) { 233 if (e instanceof ConfigurationException) { 234 throw (ConfigurationException) e; 235 } 236 237 throw new ConfigurationException("Could not start message processor: "+e, e); 238 } 239 } 240 241 /** 242 * Processes request message 243 * @param request Request message 244 * @param session Session if message is processed in the message listener thread (worker is null or cannot process jobs), null otherwise. 245 */ 246 protected abstract void processMessage(Message request, Session session); 247 248 protected void _processMessage(Message request, Session session) { 249 long start = System.currentTimeMillis(); 250 try { 251 processMessage(request, session); 252 } finally { 253 long now = System.currentTimeMillis(); 254 addMeasurement("process", now - start, now); 255 } 256 } 257 258 /** 259 * Stops worker (thread pool), if any, and connection. 260 */ 261 public void stop() throws ConfigurationException { 262 try { 263 logger.info("Stopping ..."); 264 265 // Stop worker 266 if (worker instanceof Component) { 267 ((Component) worker).stop(); 268 } 269 270 // No need to close individual sessions. 271 if (connection!=null) { 272 connection.close(); 273 } 274 275 logger.info("Stopped"); 276 } catch (Exception e) { 277 if (e instanceof ConfigurationException) { 278 throw (ConfigurationException) e; 279 } 280 281 throw new ConfigurationException("Could not stop message processor"); 282 } 283 } 284 285 /** 286 * This implementation simply creates a new session. 287 * Subclasses can override this method to implement session pooling. 288 * @return 289 * @throws JMSException 290 */ 291 protected Session borrowSession() throws JMSException { 292 return connection.createSession(isTransacted, acknowledgeMode); 293 } 294 295 /** 296 * This implementation simply closes the session. 297 * Subclasses can override this method to implement session pooling. 298 * @throws JMSException 299 */ 300 protected void releaseSession(Session session) throws JMSException { 301 session.close(); 302 } 303 304 /** 305 * Provides access to destination, initial context, connection, and worker. 306 * Bridges to initial context. For names in a form initialContext/<name> the name is looked up in initial context. 307 */ 308 protected Object getChild(String name) { 309 if ("destination".equals(name)) { 310 return destination; 311 } 312 313 if ("initialContext".equals(name)) { 314 return initialContext; 315 } 316 317 if (name!=null && name.startsWith(INITIAL_CONTEXT_PREFIX)) { 318 String jndiName = name.substring(INITIAL_CONTEXT_PREFIX.length()); 319 try { 320 return initialContext.lookup(jndiName); 321 } catch (NamingException e) { 322 throw new RuntimeConfigurationException("Lookup in inital context failed for JNDI name '"+jndiName+"':"+e, e); 323 } 324 } 325 326 if ("connection".equals(name)) { 327 return connection; 328 } 329 330 if ("connectionFactory".equals(name)) { 331 return connectionFactory; 332 } 333 334 if ("worker".equals(name)) { 335 return worker; 336 } 337 338 return super.getChild(name); 339 } 340 341 private String messageSelector; 342 343 private boolean noLocal; 344 345 /** 346 * Message selector 347 * @param messageSelector 348 */ 349 public void setMessageSelector(String messageSelector) { 350 this.messageSelector = messageSelector; 351 } 352 353 /** 354 * If true and destination is topic then messages produced by this connection are not consumed by the message processor. 355 * @param noLocal 356 */ 357 public void setNoLocal(boolean noLocal) { 358 this.noLocal = noLocal; 359 } 360 361 protected RestartCommand restartCommand; 362 363 public void setRestartCommand(RestartCommand command) { 364 this.restartCommand = command; 365 } 366 }