001 package biz.hammurapi.jms;
002
003 import javax.jms.Connection;
004 import javax.jms.ConnectionFactory;
005 import javax.jms.Destination;
006 import javax.jms.JMSException;
007 import javax.jms.Message;
008 import javax.jms.MessageConsumer;
009 import javax.jms.MessageProducer;
010 import javax.jms.Session;
011 import javax.jms.TemporaryQueue;
012 import javax.naming.Context;
013 import javax.naming.NamingException;
014
015 import biz.hammurapi.config.ConfigurationException;
016 import biz.hammurapi.config.RuntimeConfigurationException;
017
018 /**
019 * Base class for JMS adapters which operate on two queue pairs.
020 *
021 * @author Pavel Vlasov
022 */
023 public abstract class Adapter extends MessageProcessor {
024
025 private static final String BACK_END_INITIAL_CONTEXT_PREFIX = "backEndInitialContext/";
026
027 protected Context backEndInitialContext;
028
029 private String backEndConnectionFactoryName;
030
031 private String backEndDestinationName;
032
033 private String backEndReplyDestinationName;
034
035 private String replyDestinationName;
036
037 /**
038 * Initial context to look-up back-end destinations and connection factory.
039 * If back-end initial context is not set then back-end JMS objects are
040 * looked up in the "front-end" initial context.
041 *
042 * @param initialContext
043 */
044 public void setBackEndInitialContext(Context initialContext) {
045 this.initialContext = initialContext;
046 }
047
048 /**
049 * Back-end request destination (queue or topic) name.
050 *
051 * @param destinationName
052 */
053 public void setBackEndDestination(String destinationName) {
054 this.backEndDestinationName = destinationName;
055 }
056
057 /**
058 * Back-end reply destination (queue or topic) name.
059 *
060 * @param destinationName
061 */
062 public void setBackEndReplyDestination(String destinationName) {
063 this.backEndReplyDestinationName = destinationName;
064 }
065
066 /**
067 * Front-end reply destination (queue or topic) name.
068 *
069 * @param destinationName
070 */
071 public void setReplyDestination(String destinationName) {
072 this.replyDestinationName = destinationName;
073 }
074
075 /**
076 * Back-end JMS connection name. It it is not set then the "front-end"
077 * connection factory is used.
078 *
079 * @param connectionName
080 */
081 public void setBackEndConnectionFactory(String connectionFactoryName) {
082 this.backEndConnectionFactoryName = connectionFactoryName;
083 }
084
085 protected Connection backEndConnection;
086
087 private Destination backEndDestination;
088
089 private Destination backEndReplyDestination;
090
091 private Destination replyDestination;
092
093 protected boolean isBackEndTransacted;
094
095 protected int backEndAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
096
097 protected long replyTimeToLive;
098
099 protected long backEndRequestTimeToLive;
100
101 protected long backEndTimeout;
102
103 /**
104 * Timeout for backend reply.
105 * @param backEndTimeout
106 */
107 public void setBackEndTimeout(long backEndTimeout) {
108 this.backEndTimeout = backEndTimeout;
109 }
110
111 /**
112 * Time to live for reply.
113 * @param replyTimeToLive
114 */
115 public void setReplyTimeToLive(long replyTimeToLive) {
116 this.replyTimeToLive = replyTimeToLive;
117 }
118
119 /**
120 * Time to live for back-end request
121 * @param backEndRequestTimeToLive
122 */
123 public void setBackEndRequestTimeToLive(long backEndRequestTimeToLive) {
124 this.backEndRequestTimeToLive = backEndRequestTimeToLive;
125 }
126
127 /**
128 * Back-end acknoledge mode. Valid values: AUTO (default), CLIENT, DUPS_OK.
129 *
130 * @param acknowledgeModeName
131 */
132 public void setBackEndAcknowledgeMode(String acknowledgeModeName) {
133 try {
134 this.backEndAcknowledgeMode = Session.class.getField(acknowledgeModeName + "_ACKNOWLEDGE").getInt(null);
135 } catch (Exception e) {
136 throw new IllegalArgumentException("Invalid back-end acknowledge mode '" + acknowledgeModeName + "': " + e);
137 }
138 }
139
140 /**
141 * Explicitly sets back-end connection. This method is useful when several
142 * components share one connection.
143 *
144 * @param connection
145 */
146 public void setBackEndConnection(Connection connection) {
147 this.backEndConnection = connection;
148 }
149
150 /**
151 * Transactional attribute of back-end JMS Session.
152 *
153 * @param isTransacted
154 */
155 public void setBackEndTransacted(boolean isTransacted) {
156 this.isBackEndTransacted = isTransacted;
157 }
158
159 private String backEndUser;
160
161 private String backEndPwd;
162
163 protected ConnectionFactory backEndConnectionFactory;
164
165 /**
166 * Back-end JMS Connection user name. Optional.
167 *
168 * @param user
169 */
170 public void setBackEndUser(String user) {
171 this.backEndUser = user;
172 }
173
174 /**
175 * Back-end JMS Connection password. Optional.
176 *
177 * @param pwd
178 */
179 public void setPassword(String pwd) {
180 this.backEndPwd = pwd;
181 }
182
183 public void start() throws ConfigurationException {
184 super.start();
185 try {
186 logger.info(this, "Starting ...");
187
188 if (backEndConnectionFactoryName == null) {
189 backEndConnectionFactory = connectionFactory;
190 backEndConnection = connection;
191 } else {
192 backEndConnectionFactory = (ConnectionFactory) initialContext.lookup(backEndConnectionFactoryName);
193 if (backEndUser == null) {
194 backEndConnection = backEndConnectionFactory.createConnection();
195 } else {
196 backEndConnection = backEndConnectionFactory.createConnection(backEndUser, backEndPwd);
197 }
198 backEndConnection.start();
199 }
200
201 if (backEndInitialContext == null) {
202 backEndInitialContext = initialContext;
203 }
204
205 backEndDestination = (Destination) backEndInitialContext.lookup(backEndDestinationName);
206 backEndReplyDestination = (Destination) backEndInitialContext.lookup(backEndReplyDestinationName);
207
208 if (replyDestinationName!=null) {
209 replyDestination = (Destination) initialContext.lookup(replyDestinationName);
210 }
211
212 logger.info(this, "Started");
213 } catch (Exception e) {
214 if (e instanceof ConfigurationException) {
215 throw (ConfigurationException) e;
216 }
217
218 throw new ConfigurationException("Could not start message processor: " + e, e);
219 }
220 }
221
222 /**
223 * Processes request message by passing to processRequest() and
224 * processResponse() methods.
225 *
226 * @param request
227 * Request message
228 * @param session
229 * Session if message is processed in the message listener thread
230 * (worker is null or cannot process jobs), null otherwise.
231 */
232 protected void processMessage(Message request, Session session) {
233 try {
234 Session frontEndSession = session == null ? borrowSession() : session;
235 Destination frontEndReplyDestination = request.getJMSReplyTo() == null ? replyDestination : request.getJMSReplyTo();
236 Message frontEndReply = null;
237 try {
238 Session backEndSession = connection == backEndConnection ? session : borrowBackEndSession();
239 try {
240 long start = System.currentTimeMillis();
241 Message backEndRequest = processRequest(request, frontEndSession, backEndSession);
242 long afterProcessRequest = System.currentTimeMillis();
243 addMeasurement("process-request", afterProcessRequest-start, afterProcessRequest);
244 if (backEndRequest!=null) {
245 Destination brd = backEndReplyDestination;
246 if (brd==null) {
247 brd = backEndSession.createTemporaryQueue();
248 }
249 backEndRequest.setJMSReplyTo(brd);
250 MessageProducer backEndProducer = backEndSession.createProducer(backEndDestination);
251
252 String messageSelector;
253
254 try {
255 backEndProducer.send(backEndRequest);
256 messageSelector = messageSelector(backEndRequest);
257 } finally {
258 backEndProducer.close();
259 }
260
261 Message backEndReply;
262 MessageConsumer consumer;
263 if (brd instanceof TemporaryQueue) {
264 logger.debug(this, "Waiting on "+brd);
265 consumer = session.createConsumer(brd);
266 } else {
267 logger.debug(this, "Waiting for "+messageSelector+" on "+brd);
268 consumer = session.createConsumer(brd, messageSelector);
269 }
270
271 try {
272 backEndReply = backEndTimeout == 0 ? consumer.receive() : consumer.receive(backEndTimeout);
273 } finally {
274 consumer.close();
275 if (brd instanceof TemporaryQueue) {
276 ((TemporaryQueue) brd).delete();
277 }
278 }
279 long afterBackEndReply = System.currentTimeMillis();
280 addMeasurement("back-end", afterBackEndReply-afterProcessRequest, afterBackEndReply);
281
282 if (frontEndReplyDestination==null) {
283 logger.warn(this, "Unable to send reply to front-end - reply destination is null");
284 } else {
285 frontEndReply = processReply(backEndReply, request, frontEndSession, backEndSession);
286 long afterProcessReply = System.currentTimeMillis();
287 addMeasurement("process-reply", afterProcessReply-afterBackEndReply, afterProcessReply);
288 }
289 }
290 } catch (Exception e) {
291 frontEndReply = handleException(e, frontEndSession);
292 } finally {
293 if (backEndSession!=frontEndSession) {
294 releaseBackEndSession(backEndSession);
295 }
296 }
297
298 if (frontEndReply!=null) {
299 if (frontEndReply.getJMSCorrelationID()==null) {
300 frontEndReply.setJMSCorrelationID(request.getJMSCorrelationID()==null ? request.getJMSMessageID() : request.getJMSCorrelationID());
301 }
302 MessageProducer replyProducer = frontEndSession.createProducer(frontEndReplyDestination);
303 try {
304 if (replyTimeToLive!=0) {
305 replyProducer.setTimeToLive(replyTimeToLive);
306 }
307 replyProducer.send(frontEndReply);
308 } finally {
309 replyProducer.close();
310 }
311 }
312 } finally {
313 if (session == null) {
314 releaseSession(frontEndSession);
315 }
316 }
317 } catch (Exception e) {
318 logger.error(e, e.toString());
319 }
320 }
321
322 /**
323 * Processes front-end request message and builds back-end request message.
324 *
325 * @param request
326 * Request from front-end
327 * @param frontEndSession
328 * Front-end JMS session
329 * @param backEndSession
330 * Back-end JMS session. Equals to the front-end session if back
331 * end connection is not set and the front-end connection is used
332 * to connect to both front and back end.
333 * @return Message to be sent to the back-end request queue. Null if no message shall be sent (e.g. in the case of exception).
334 */
335 protected abstract Message processRequest(Message frontEndRequest, Session frontEndSession, Session backEndSession);
336
337 /**
338 * Processes back-end reply and forms reply to front-end. This method is
339 * responsible for setting correlation ID.
340 *
341 * @param backEndReply
342 * Reply message from back-end. Null if request timed out.
343 * @param frontEndRequest
344 * Original request from front end.
345 * @param frontEndSession
346 * Front-end JMS session.
347 * @param backEndSession
348 * Back-end JMS session. Equals to the front-end session if back
349 * end connection is not set and the front-end connection is used
350 * to connect to both front and back end.
351 * @return Message to be sent to the front-end reply queue. Null if no message shall be sent (e.g. in the case of exception).
352 */
353 protected abstract Message processReply(Message backEndReply, Message frontEndRequest, Session frontEndSession, Session backEndSession);
354
355 /**
356 * Method to report exceptions to fron-end (excluding JMS exceptions in front-end JMS objects). This implementation returns null.
357 * @param exception Exception to report
358 * @param frontEndSession Front-end JMS session
359 * @return Message to send to front-end to reply queue. Null if no message shall be sent.
360 */
361 protected Message handleException(Exception exception, Session frontEndSession) {
362 return null;
363 }
364
365 /**
366 * Extracts message selector for reply back-end message from the back-end
367 * request message. This implementation returns
368 * <code>"JMSCorrelationID='" +backEndRequest.getJMSMessageID()+"'"</code>
369 *
370 * @param backEndRequest
371 * @return Message selector to retrieve back-end reply.
372 * @throws JMSException
373 */
374 protected String messageSelector(Message backEndRequest) throws JMSException {
375 return "JMSCorrelationID='" + backEndRequest.getJMSMessageID() + "'";
376 }
377
378 /**
379 * Stops worker (thread pool), if any, and connection.
380 */
381 public void stop() throws ConfigurationException {
382 try {
383 logger.info(this, "Stopping ...");
384
385 if (backEndConnection != null && backEndConnection != connection) {
386 backEndConnection.close();
387 }
388
389 logger.info(this, "Stopped");
390 } catch (Exception e) {
391 if (e instanceof ConfigurationException) {
392 throw (ConfigurationException) e;
393 }
394
395 throw new ConfigurationException("Could not stop message processor");
396 }
397 }
398
399 /**
400 * This implementation simply creates a new session. Subclasses can override
401 * this method to implement session pooling.
402 *
403 * @return
404 * @throws JMSException
405 */
406 protected Session borrowBackEndSession() throws JMSException {
407 return backEndConnection.createSession(isBackEndTransacted, backEndAcknowledgeMode);
408 }
409
410 /**
411 * This implementation simply closes the session. Subclasses can override
412 * this method to implement session pooling.
413 *
414 * @throws JMSException
415 */
416 protected void releaseBackEndSession(Session session) throws JMSException {
417 session.close();
418 }
419
420 /**
421 * Provides access to back-end destination, reply destinations, initial
422 * context, and connection in addition to objects accessible through
423 * superclass getChild() method. Bridges to back-end initial context. For
424 * names in a form backEndInitialContext/<name> the name is looked up
425 * in initial context.
426 */
427 protected Object getChild(String name) {
428 if ("replyDestination".equals(name)) {
429 return replyDestination;
430 }
431
432 if ("backEndReplyDestination".equals(name)) {
433 return backEndReplyDestination;
434 }
435
436 if ("backEndDestination".equals(name)) {
437 return backEndDestination;
438 }
439
440 if ("backEndInitialContext".equals(name)) {
441 return backEndInitialContext;
442 }
443
444 if (name != null && name.startsWith(BACK_END_INITIAL_CONTEXT_PREFIX)) {
445 String jndiName = name.substring(BACK_END_INITIAL_CONTEXT_PREFIX.length());
446 try {
447 return backEndInitialContext.lookup(jndiName);
448 } catch (NamingException e) {
449 throw new RuntimeConfigurationException("Lookup in back-end inital context failed for JNDI name '" + jndiName + "':" + e, e);
450 }
451 }
452
453 if ("backEndConnection".equals(name)) {
454 return backEndConnection;
455 }
456
457 if ("backEndConnectionFactory".equals(name)) {
458 return backEndConnectionFactory;
459 }
460
461 return super.getChild(name);
462 }
463
464 }