001 package biz.hammurapi.jms; 002 003 import java.util.logging.Level; 004 import java.util.logging.Logger; 005 006 import javax.jms.Connection; 007 import javax.jms.ConnectionFactory; 008 import javax.jms.Destination; 009 import javax.jms.JMSException; 010 import javax.jms.Message; 011 import javax.jms.MessageConsumer; 012 import javax.jms.MessageProducer; 013 import javax.jms.Session; 014 import javax.jms.TemporaryQueue; 015 import javax.naming.Context; 016 import javax.naming.NamingException; 017 018 import biz.hammurapi.config.ConfigurationException; 019 import biz.hammurapi.config.RuntimeConfigurationException; 020 021 /** 022 * Base class for JMS adapters which operate on two queue pairs. 023 * 024 * @author Pavel Vlasov 025 */ 026 public abstract class Adapter extends MessageProcessor { 027 028 private static final Logger logger = java.util.logging.Logger.getLogger(Adapter.class.getName()); 029 030 private static final String BACK_END_INITIAL_CONTEXT_PREFIX = "backEndInitialContext/"; 031 032 protected Context backEndInitialContext; 033 034 private String backEndConnectionFactoryName; 035 036 private String backEndDestinationName; 037 038 private String backEndReplyDestinationName; 039 040 private String replyDestinationName; 041 042 /** 043 * Initial context to look-up back-end destinations and connection factory. 044 * If back-end initial context is not set then back-end JMS objects are 045 * looked up in the "front-end" initial context. 046 * 047 * @param initialContext 048 */ 049 public void setBackEndInitialContext(Context initialContext) { 050 this.initialContext = initialContext; 051 } 052 053 /** 054 * Back-end request destination (queue or topic) name. 055 * 056 * @param destinationName 057 */ 058 public void setBackEndDestination(String destinationName) { 059 this.backEndDestinationName = destinationName; 060 } 061 062 /** 063 * Back-end reply destination (queue or topic) name. 064 * 065 * @param destinationName 066 */ 067 public void setBackEndReplyDestination(String destinationName) { 068 this.backEndReplyDestinationName = destinationName; 069 } 070 071 /** 072 * Front-end reply destination (queue or topic) name. 073 * 074 * @param destinationName 075 */ 076 public void setReplyDestination(String destinationName) { 077 this.replyDestinationName = destinationName; 078 } 079 080 /** 081 * Back-end JMS connection name. It it is not set then the "front-end" 082 * connection factory is used. 083 * 084 * @param connectionName 085 */ 086 public void setBackEndConnectionFactory(String connectionFactoryName) { 087 this.backEndConnectionFactoryName = connectionFactoryName; 088 } 089 090 protected Connection backEndConnection; 091 092 private Destination backEndDestination; 093 094 private Destination backEndReplyDestination; 095 096 private Destination replyDestination; 097 098 protected boolean isBackEndTransacted; 099 100 protected int backEndAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; 101 102 protected long replyTimeToLive; 103 104 protected long backEndRequestTimeToLive; 105 106 protected long backEndTimeout; 107 108 /** 109 * Timeout for backend reply. 110 * @param backEndTimeout 111 */ 112 public void setBackEndTimeout(long backEndTimeout) { 113 this.backEndTimeout = backEndTimeout; 114 } 115 116 /** 117 * Time to live for reply. 118 * @param replyTimeToLive 119 */ 120 public void setReplyTimeToLive(long replyTimeToLive) { 121 this.replyTimeToLive = replyTimeToLive; 122 } 123 124 /** 125 * Time to live for back-end request 126 * @param backEndRequestTimeToLive 127 */ 128 public void setBackEndRequestTimeToLive(long backEndRequestTimeToLive) { 129 this.backEndRequestTimeToLive = backEndRequestTimeToLive; 130 } 131 132 /** 133 * Back-end acknoledge mode. Valid values: AUTO (default), CLIENT, DUPS_OK. 134 * 135 * @param acknowledgeModeName 136 */ 137 public void setBackEndAcknowledgeMode(String acknowledgeModeName) { 138 try { 139 this.backEndAcknowledgeMode = Session.class.getField(acknowledgeModeName + "_ACKNOWLEDGE").getInt(null); 140 } catch (Exception e) { 141 throw new IllegalArgumentException("Invalid back-end acknowledge mode '" + acknowledgeModeName + "': " + e); 142 } 143 } 144 145 /** 146 * Explicitly sets back-end connection. This method is useful when several 147 * components share one connection. 148 * 149 * @param connection 150 */ 151 public void setBackEndConnection(Connection connection) { 152 this.backEndConnection = connection; 153 } 154 155 /** 156 * Transactional attribute of back-end JMS Session. 157 * 158 * @param isTransacted 159 */ 160 public void setBackEndTransacted(boolean isTransacted) { 161 this.isBackEndTransacted = isTransacted; 162 } 163 164 private String backEndUser; 165 166 private String backEndPwd; 167 168 protected ConnectionFactory backEndConnectionFactory; 169 170 /** 171 * Back-end JMS Connection user name. Optional. 172 * 173 * @param user 174 */ 175 public void setBackEndUser(String user) { 176 this.backEndUser = user; 177 } 178 179 /** 180 * Back-end JMS Connection password. Optional. 181 * 182 * @param pwd 183 */ 184 public void setPassword(String pwd) { 185 this.backEndPwd = pwd; 186 } 187 188 public void start() throws ConfigurationException { 189 super.start(); 190 try { 191 logger.info("Starting ..."); 192 193 if (backEndConnectionFactoryName == null) { 194 backEndConnectionFactory = connectionFactory; 195 backEndConnection = connection; 196 } else { 197 backEndConnectionFactory = (ConnectionFactory) initialContext.lookup(backEndConnectionFactoryName); 198 if (backEndUser == null) { 199 backEndConnection = backEndConnectionFactory.createConnection(); 200 } else { 201 backEndConnection = backEndConnectionFactory.createConnection(backEndUser, backEndPwd); 202 } 203 backEndConnection.start(); 204 } 205 206 if (backEndInitialContext == null) { 207 backEndInitialContext = initialContext; 208 } 209 210 backEndDestination = (Destination) backEndInitialContext.lookup(backEndDestinationName); 211 backEndReplyDestination = (Destination) backEndInitialContext.lookup(backEndReplyDestinationName); 212 213 if (replyDestinationName!=null) { 214 replyDestination = (Destination) initialContext.lookup(replyDestinationName); 215 } 216 217 logger.info("Started"); 218 } catch (Exception e) { 219 if (e instanceof ConfigurationException) { 220 throw (ConfigurationException) e; 221 } 222 223 throw new ConfigurationException("Could not start message processor: " + e, e); 224 } 225 } 226 227 /** 228 * Processes request message by passing to processRequest() and 229 * processResponse() methods. 230 * 231 * @param request 232 * Request message 233 * @param session 234 * Session if message is processed in the message listener thread 235 * (worker is null or cannot process jobs), null otherwise. 236 */ 237 protected void processMessage(Message request, Session session) { 238 try { 239 Session frontEndSession = session == null ? borrowSession() : session; 240 Destination frontEndReplyDestination = request.getJMSReplyTo() == null ? replyDestination : request.getJMSReplyTo(); 241 Message frontEndReply = null; 242 try { 243 Session backEndSession = connection == backEndConnection ? session : borrowBackEndSession(); 244 try { 245 long start = System.currentTimeMillis(); 246 Message backEndRequest = processRequest(request, frontEndSession, backEndSession); 247 long afterProcessRequest = System.currentTimeMillis(); 248 addMeasurement("process-request", afterProcessRequest-start, afterProcessRequest); 249 if (backEndRequest!=null) { 250 Destination brd = backEndReplyDestination; 251 if (brd==null) { 252 brd = backEndSession.createTemporaryQueue(); 253 } 254 backEndRequest.setJMSReplyTo(brd); 255 MessageProducer backEndProducer = backEndSession.createProducer(backEndDestination); 256 257 String messageSelector; 258 259 try { 260 backEndProducer.send(backEndRequest); 261 messageSelector = messageSelector(backEndRequest); 262 } finally { 263 backEndProducer.close(); 264 } 265 266 Message backEndReply; 267 MessageConsumer consumer; 268 if (brd instanceof TemporaryQueue) { 269 logger.fine("Waiting on "+brd); 270 consumer = session.createConsumer(brd); 271 } else { 272 logger.fine("Waiting for "+messageSelector+" on "+brd); 273 consumer = session.createConsumer(brd, messageSelector); 274 } 275 276 try { 277 backEndReply = backEndTimeout == 0 ? consumer.receive() : consumer.receive(backEndTimeout); 278 } finally { 279 consumer.close(); 280 if (brd instanceof TemporaryQueue) { 281 ((TemporaryQueue) brd).delete(); 282 } 283 } 284 long afterBackEndReply = System.currentTimeMillis(); 285 addMeasurement("back-end", afterBackEndReply-afterProcessRequest, afterBackEndReply); 286 287 if (frontEndReplyDestination==null) { 288 logger.warning("Unable to send reply to front-end - reply destination is null"); 289 } else { 290 frontEndReply = processReply(backEndReply, request, frontEndSession, backEndSession); 291 long afterProcessReply = System.currentTimeMillis(); 292 addMeasurement("process-reply", afterProcessReply-afterBackEndReply, afterProcessReply); 293 } 294 } 295 } catch (Exception e) { 296 frontEndReply = handleException(e, frontEndSession); 297 } finally { 298 if (backEndSession!=frontEndSession) { 299 releaseBackEndSession(backEndSession); 300 } 301 } 302 303 if (frontEndReply!=null) { 304 if (frontEndReply.getJMSCorrelationID()==null) { 305 frontEndReply.setJMSCorrelationID(request.getJMSCorrelationID()==null ? request.getJMSMessageID() : request.getJMSCorrelationID()); 306 } 307 MessageProducer replyProducer = frontEndSession.createProducer(frontEndReplyDestination); 308 try { 309 if (replyTimeToLive!=0) { 310 replyProducer.setTimeToLive(replyTimeToLive); 311 } 312 replyProducer.send(frontEndReply); 313 } finally { 314 replyProducer.close(); 315 } 316 } 317 } finally { 318 if (session == null) { 319 releaseSession(frontEndSession); 320 } 321 } 322 } catch (Exception e) { 323 logger.log(Level.SEVERE, "Exception: "+e, e); 324 } 325 } 326 327 /** 328 * Processes front-end request message and builds back-end request message. 329 * 330 * @param request 331 * Request from front-end 332 * @param frontEndSession 333 * Front-end JMS session 334 * @param backEndSession 335 * Back-end JMS session. Equals to the front-end session if back 336 * end connection is not set and the front-end connection is used 337 * to connect to both front and back end. 338 * @return Message to be sent to the back-end request queue. Null if no message shall be sent (e.g. in the case of exception). 339 */ 340 protected abstract Message processRequest(Message frontEndRequest, Session frontEndSession, Session backEndSession); 341 342 /** 343 * Processes back-end reply and forms reply to front-end. This method is 344 * responsible for setting correlation ID. 345 * 346 * @param backEndReply 347 * Reply message from back-end. Null if request timed out. 348 * @param frontEndRequest 349 * Original request from front end. 350 * @param frontEndSession 351 * Front-end JMS session. 352 * @param backEndSession 353 * Back-end JMS session. Equals to the front-end session if back 354 * end connection is not set and the front-end connection is used 355 * to connect to both front and back end. 356 * @return Message to be sent to the front-end reply queue. Null if no message shall be sent (e.g. in the case of exception). 357 */ 358 protected abstract Message processReply(Message backEndReply, Message frontEndRequest, Session frontEndSession, Session backEndSession); 359 360 /** 361 * Method to report exceptions to fron-end (excluding JMS exceptions in front-end JMS objects). This implementation returns null. 362 * @param exception Exception to report 363 * @param frontEndSession Front-end JMS session 364 * @return Message to send to front-end to reply queue. Null if no message shall be sent. 365 */ 366 protected Message handleException(Exception exception, Session frontEndSession) { 367 return null; 368 } 369 370 /** 371 * Extracts message selector for reply back-end message from the back-end 372 * request message. This implementation returns 373 * <code>"JMSCorrelationID='" +backEndRequest.getJMSMessageID()+"'"</code> 374 * 375 * @param backEndRequest 376 * @return Message selector to retrieve back-end reply. 377 * @throws JMSException 378 */ 379 protected String messageSelector(Message backEndRequest) throws JMSException { 380 return "JMSCorrelationID='" + backEndRequest.getJMSMessageID() + "'"; 381 } 382 383 /** 384 * Stops worker (thread pool), if any, and connection. 385 */ 386 public void stop() throws ConfigurationException { 387 try { 388 logger.info("Stopping ..."); 389 390 if (backEndConnection != null && backEndConnection != connection) { 391 backEndConnection.close(); 392 } 393 394 logger.info("Stopped"); 395 } catch (Exception e) { 396 if (e instanceof ConfigurationException) { 397 throw (ConfigurationException) e; 398 } 399 400 throw new ConfigurationException("Could not stop message processor"); 401 } 402 } 403 404 /** 405 * This implementation simply creates a new session. Subclasses can override 406 * this method to implement session pooling. 407 * 408 * @return 409 * @throws JMSException 410 */ 411 protected Session borrowBackEndSession() throws JMSException { 412 return backEndConnection.createSession(isBackEndTransacted, backEndAcknowledgeMode); 413 } 414 415 /** 416 * This implementation simply closes the session. Subclasses can override 417 * this method to implement session pooling. 418 * 419 * @throws JMSException 420 */ 421 protected void releaseBackEndSession(Session session) throws JMSException { 422 session.close(); 423 } 424 425 /** 426 * Provides access to back-end destination, reply destinations, initial 427 * context, and connection in addition to objects accessible through 428 * superclass getChild() method. Bridges to back-end initial context. For 429 * names in a form backEndInitialContext/<name> the name is looked up 430 * in initial context. 431 */ 432 protected Object getChild(String name) { 433 if ("replyDestination".equals(name)) { 434 return replyDestination; 435 } 436 437 if ("backEndReplyDestination".equals(name)) { 438 return backEndReplyDestination; 439 } 440 441 if ("backEndDestination".equals(name)) { 442 return backEndDestination; 443 } 444 445 if ("backEndInitialContext".equals(name)) { 446 return backEndInitialContext; 447 } 448 449 if (name != null && name.startsWith(BACK_END_INITIAL_CONTEXT_PREFIX)) { 450 String jndiName = name.substring(BACK_END_INITIAL_CONTEXT_PREFIX.length()); 451 try { 452 return backEndInitialContext.lookup(jndiName); 453 } catch (NamingException e) { 454 throw new RuntimeConfigurationException("Lookup in back-end inital context failed for JNDI name '" + jndiName + "':" + e, e); 455 } 456 } 457 458 if ("backEndConnection".equals(name)) { 459 return backEndConnection; 460 } 461 462 if ("backEndConnectionFactory".equals(name)) { 463 return backEndConnectionFactory; 464 } 465 466 return super.getChild(name); 467 } 468 469 }