001    package biz.hammurapi.jms;
002    
003    import java.util.logging.Level;
004    import java.util.logging.Logger;
005    
006    import javax.jms.Connection;
007    import javax.jms.ConnectionFactory;
008    import javax.jms.Destination;
009    import javax.jms.JMSException;
010    import javax.jms.Message;
011    import javax.jms.MessageConsumer;
012    import javax.jms.MessageProducer;
013    import javax.jms.Session;
014    import javax.jms.TemporaryQueue;
015    import javax.naming.Context;
016    import javax.naming.NamingException;
017    
018    import biz.hammurapi.config.ConfigurationException;
019    import biz.hammurapi.config.RuntimeConfigurationException;
020    
021    /**
022     * Base class for JMS adapters which operate on two queue pairs.
023     * 
024     * @author Pavel Vlasov
025     */
026    public abstract class Adapter extends MessageProcessor {
027    
028            private static final Logger logger = java.util.logging.Logger.getLogger(Adapter.class.getName());
029            
030            private static final String BACK_END_INITIAL_CONTEXT_PREFIX = "backEndInitialContext/";
031    
032            protected Context backEndInitialContext;
033    
034            private String backEndConnectionFactoryName;
035    
036            private String backEndDestinationName;
037    
038            private String backEndReplyDestinationName;
039    
040            private String replyDestinationName;
041    
042            /**
043             * Initial context to look-up back-end destinations and connection factory.
044             * If back-end initial context is not set then back-end JMS objects are
045             * looked up in the "front-end" initial context.
046             * 
047             * @param initialContext
048             */
049            public void setBackEndInitialContext(Context initialContext) {
050                    this.initialContext = initialContext;
051            }
052    
053            /**
054             * Back-end request destination (queue or topic) name.
055             * 
056             * @param destinationName
057             */
058            public void setBackEndDestination(String destinationName) {
059                    this.backEndDestinationName = destinationName;
060            }
061    
062            /**
063             * Back-end reply destination (queue or topic) name.
064             * 
065             * @param destinationName
066             */
067            public void setBackEndReplyDestination(String destinationName) {
068                    this.backEndReplyDestinationName = destinationName;
069            }
070    
071            /**
072             * Front-end reply destination (queue or topic) name.
073             * 
074             * @param destinationName
075             */
076            public void setReplyDestination(String destinationName) {
077                    this.replyDestinationName = destinationName;
078            }
079    
080            /**
081             * Back-end JMS connection name. It it is not set then the "front-end"
082             * connection factory is used.
083             * 
084             * @param connectionName
085             */
086            public void setBackEndConnectionFactory(String connectionFactoryName) {
087                    this.backEndConnectionFactoryName = connectionFactoryName;
088            }
089    
090            protected Connection backEndConnection;
091    
092            private Destination backEndDestination;
093    
094            private Destination backEndReplyDestination;
095    
096            private Destination replyDestination;
097    
098            protected boolean isBackEndTransacted;
099    
100            protected int backEndAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
101            
102            protected long replyTimeToLive;
103            
104            protected long backEndRequestTimeToLive;
105            
106            protected long backEndTimeout;
107            
108            /**
109             * Timeout for backend reply. 
110             * @param backEndTimeout
111             */
112            public void setBackEndTimeout(long backEndTimeout) {
113                    this.backEndTimeout = backEndTimeout;
114            }
115            
116            /**
117             * Time to live for reply.
118             * @param replyTimeToLive
119             */
120            public void setReplyTimeToLive(long replyTimeToLive) {
121                    this.replyTimeToLive = replyTimeToLive;
122            }
123            
124            /**
125             * Time to live for back-end request
126             * @param backEndRequestTimeToLive
127             */
128            public void setBackEndRequestTimeToLive(long backEndRequestTimeToLive) {
129                    this.backEndRequestTimeToLive = backEndRequestTimeToLive;
130            }
131    
132            /**
133             * Back-end acknoledge mode. Valid values: AUTO (default), CLIENT, DUPS_OK.
134             * 
135             * @param acknowledgeModeName
136             */
137            public void setBackEndAcknowledgeMode(String acknowledgeModeName) {
138                    try {
139                            this.backEndAcknowledgeMode = Session.class.getField(acknowledgeModeName + "_ACKNOWLEDGE").getInt(null);
140                    } catch (Exception e) {
141                            throw new IllegalArgumentException("Invalid back-end acknowledge mode '" + acknowledgeModeName + "': " + e);
142                    }
143            }
144    
145            /**
146             * Explicitly sets back-end connection. This method is useful when several
147             * components share one connection.
148             * 
149             * @param connection
150             */
151            public void setBackEndConnection(Connection connection) {
152                    this.backEndConnection = connection;
153            }
154    
155            /**
156             * Transactional attribute of back-end JMS Session.
157             * 
158             * @param isTransacted
159             */
160            public void setBackEndTransacted(boolean isTransacted) {
161                    this.isBackEndTransacted = isTransacted;
162            }
163    
164            private String backEndUser;
165    
166            private String backEndPwd;
167    
168            protected ConnectionFactory backEndConnectionFactory;
169    
170            /**
171             * Back-end JMS Connection user name. Optional.
172             * 
173             * @param user
174             */
175            public void setBackEndUser(String user) {
176                    this.backEndUser = user;
177            }
178    
179            /**
180             * Back-end JMS Connection password. Optional.
181             * 
182             * @param pwd
183             */
184            public void setPassword(String pwd) {
185                    this.backEndPwd = pwd;
186            }
187    
188            public void start() throws ConfigurationException {
189                    super.start();
190                    try {
191                            logger.info("Starting ...");
192    
193                            if (backEndConnectionFactoryName == null) {
194                                    backEndConnectionFactory = connectionFactory;
195                                    backEndConnection = connection;
196                            } else {
197                                    backEndConnectionFactory = (ConnectionFactory) initialContext.lookup(backEndConnectionFactoryName);
198                                    if (backEndUser == null) {
199                                            backEndConnection = backEndConnectionFactory.createConnection();
200                                    } else {
201                                            backEndConnection = backEndConnectionFactory.createConnection(backEndUser, backEndPwd);
202                                    }
203                                    backEndConnection.start();
204                            }
205    
206                            if (backEndInitialContext == null) {
207                                    backEndInitialContext = initialContext;
208                            }
209    
210                            backEndDestination = (Destination) backEndInitialContext.lookup(backEndDestinationName);
211                            backEndReplyDestination = (Destination) backEndInitialContext.lookup(backEndReplyDestinationName);
212    
213                            if (replyDestinationName!=null) {
214                                    replyDestination = (Destination) initialContext.lookup(replyDestinationName);
215                            }
216    
217                            logger.info("Started");
218                    } catch (Exception e) {
219                            if (e instanceof ConfigurationException) {
220                                    throw (ConfigurationException) e;
221                            }
222    
223                            throw new ConfigurationException("Could not start message processor: " + e, e);
224                    }
225            }
226    
227            /**
228             * Processes request message by passing to processRequest() and
229             * processResponse() methods.
230             * 
231             * @param request
232             *            Request message
233             * @param session
234             *            Session if message is processed in the message listener thread
235             *            (worker is null or cannot process jobs), null otherwise.
236             */
237            protected void processMessage(Message request, Session session) {
238                    try {
239                            Session frontEndSession = session == null ? borrowSession() : session;
240                            Destination frontEndReplyDestination = request.getJMSReplyTo() == null ? replyDestination : request.getJMSReplyTo();
241                            Message frontEndReply = null;
242                            try {
243                                    Session backEndSession = connection == backEndConnection ? session : borrowBackEndSession();
244                                    try {
245                                            long start = System.currentTimeMillis();
246                                            Message backEndRequest = processRequest(request, frontEndSession, backEndSession);
247                                            long afterProcessRequest = System.currentTimeMillis();
248                                            addMeasurement("process-request", afterProcessRequest-start, afterProcessRequest);
249                                            if (backEndRequest!=null) {                                     
250                                                    Destination brd = backEndReplyDestination;
251                                                    if (brd==null) {
252                                                            brd = backEndSession.createTemporaryQueue();
253                                                    }
254                                                    backEndRequest.setJMSReplyTo(brd);
255                                        MessageProducer backEndProducer = backEndSession.createProducer(backEndDestination); 
256                            
257                                        String messageSelector;
258                            
259                                        try {
260                                            backEndProducer.send(backEndRequest); 
261                                            messageSelector = messageSelector(backEndRequest);
262                                        } finally {
263                                            backEndProducer.close();
264                                        }
265                            
266                                        Message backEndReply;
267                                                    MessageConsumer consumer;
268                                                    if (brd instanceof TemporaryQueue) {
269                                                logger.fine("Waiting on "+brd);            
270                                                            consumer = session.createConsumer(brd);
271                                                    } else {
272                                                logger.fine("Waiting for "+messageSelector+" on "+brd);                 
273                                                            consumer = session.createConsumer(brd, messageSelector);
274                                                    }
275                                                            
276                                                    try {
277                                                            backEndReply = backEndTimeout == 0 ? consumer.receive() : consumer.receive(backEndTimeout);
278                                        } finally {
279                                            consumer.close();
280                                                            if (brd instanceof TemporaryQueue) {
281                                                                    ((TemporaryQueue) brd).delete();
282                                                            }
283                                        }
284                                        long afterBackEndReply = System.currentTimeMillis();
285                                        addMeasurement("back-end", afterBackEndReply-afterProcessRequest, afterBackEndReply);
286                            
287                                                    if (frontEndReplyDestination==null) {
288                                                            logger.warning("Unable to send reply to front-end - reply destination is null");
289                                                    } else {
290                                                            frontEndReply = processReply(backEndReply, request, frontEndSession, backEndSession);
291                                                            long afterProcessReply = System.currentTimeMillis();
292                                                            addMeasurement("process-reply", afterProcessReply-afterBackEndReply, afterProcessReply);                                                        
293                                                    }
294                                            }
295                                    } catch (Exception e) {
296                                            frontEndReply = handleException(e, frontEndSession);
297                                    } finally {
298                                            if (backEndSession!=frontEndSession) {
299                                                    releaseBackEndSession(backEndSession);
300                                            }
301                                    }
302                                    
303                                    if (frontEndReply!=null) {
304                                            if (frontEndReply.getJMSCorrelationID()==null) {
305                                                    frontEndReply.setJMSCorrelationID(request.getJMSCorrelationID()==null ? request.getJMSMessageID() : request.getJMSCorrelationID());
306                                            }
307                                            MessageProducer replyProducer = frontEndSession.createProducer(frontEndReplyDestination);
308                                            try {
309                                                    if (replyTimeToLive!=0) {
310                                                            replyProducer.setTimeToLive(replyTimeToLive);   
311                                                    }
312                                                    replyProducer.send(frontEndReply);
313                                            } finally {
314                                                    replyProducer.close();
315                                            }
316                                    }
317                            } finally {
318                                    if (session == null) {
319                                            releaseSession(frontEndSession);
320                                    }
321                            }
322                    } catch (Exception e) {
323                            logger.log(Level.SEVERE, "Exception: "+e, e);
324                    }
325            }
326    
327            /**
328             * Processes front-end request message and builds back-end request message.
329             * 
330             * @param request
331             *            Request from front-end
332             * @param frontEndSession
333             *            Front-end JMS session
334             * @param backEndSession
335             *            Back-end JMS session. Equals to the front-end session if back
336             *            end connection is not set and the front-end connection is used
337             *            to connect to both front and back end.
338             * @return Message to be sent to the back-end request queue. Null if no message shall be sent (e.g. in the case of exception).
339             */
340            protected abstract Message processRequest(Message frontEndRequest, Session frontEndSession, Session backEndSession);
341    
342            /**
343             * Processes back-end reply and forms reply to front-end. This method is
344             * responsible for setting correlation ID.
345             * 
346             * @param backEndReply
347             *            Reply message from back-end. Null if request timed out.
348             * @param frontEndRequest
349             *            Original request from front end.
350             * @param frontEndSession
351             *            Front-end JMS session.
352             * @param backEndSession
353             *            Back-end JMS session. Equals to the front-end session if back
354             *            end connection is not set and the front-end connection is used
355             *            to connect to both front and back end.
356             * @return Message to be sent to the front-end reply queue. Null if no message shall be sent (e.g. in the case of exception).
357             */
358            protected abstract Message processReply(Message backEndReply, Message frontEndRequest, Session frontEndSession, Session backEndSession);
359            
360            /**
361             * Method to report exceptions to fron-end (excluding JMS exceptions in front-end JMS objects). This implementation returns null.
362             * @param exception Exception to report
363             * @param frontEndSession Front-end JMS session
364             * @return Message to send to front-end to reply queue. Null if no message shall be sent.
365             */
366            protected Message handleException(Exception exception, Session frontEndSession) {
367                    return null;
368            }
369    
370            /**
371             * Extracts message selector for reply back-end message from the back-end
372             * request message. This implementation returns
373             * <code>"JMSCorrelationID='" +backEndRequest.getJMSMessageID()+"'"</code>
374             * 
375             * @param backEndRequest
376             * @return Message selector to retrieve back-end reply.
377             * @throws JMSException
378             */
379            protected String messageSelector(Message backEndRequest) throws JMSException {
380                    return "JMSCorrelationID='" + backEndRequest.getJMSMessageID() + "'";
381            }
382    
383            /**
384             * Stops worker (thread pool), if any, and connection.
385             */
386            public void stop() throws ConfigurationException {
387                    try {
388                            logger.info("Stopping ...");
389    
390                            if (backEndConnection != null && backEndConnection != connection) {
391                                    backEndConnection.close();
392                            }
393    
394                            logger.info("Stopped");
395                    } catch (Exception e) {
396                            if (e instanceof ConfigurationException) {
397                                    throw (ConfigurationException) e;
398                            }
399    
400                            throw new ConfigurationException("Could not stop message processor");
401                    }
402            }
403    
404            /**
405             * This implementation simply creates a new session. Subclasses can override
406             * this method to implement session pooling.
407             * 
408             * @return
409             * @throws JMSException
410             */
411            protected Session borrowBackEndSession() throws JMSException {
412                    return backEndConnection.createSession(isBackEndTransacted,     backEndAcknowledgeMode);
413            }
414    
415            /**
416             * This implementation simply closes the session. Subclasses can override
417             * this method to implement session pooling.
418             * 
419             * @throws JMSException
420             */
421            protected void releaseBackEndSession(Session session) throws JMSException {
422                    session.close();
423            }
424    
425            /**
426             * Provides access to back-end destination, reply destinations, initial
427             * context, and connection in addition to objects accessible through
428             * superclass getChild() method. Bridges to back-end initial context. For
429             * names in a form backEndInitialContext/&lt;name&gt; the name is looked up
430             * in initial context.
431             */
432            protected Object getChild(String name) {
433                    if ("replyDestination".equals(name)) {
434                            return replyDestination;
435                    }
436    
437                    if ("backEndReplyDestination".equals(name)) {
438                            return backEndReplyDestination;
439                    }
440    
441                    if ("backEndDestination".equals(name)) {
442                            return backEndDestination;
443                    }
444    
445                    if ("backEndInitialContext".equals(name)) {
446                            return backEndInitialContext;
447                    }
448    
449                    if (name != null && name.startsWith(BACK_END_INITIAL_CONTEXT_PREFIX)) {
450                            String jndiName = name.substring(BACK_END_INITIAL_CONTEXT_PREFIX.length());
451                            try {
452                                    return backEndInitialContext.lookup(jndiName);
453                            } catch (NamingException e) {
454                                    throw new RuntimeConfigurationException("Lookup in back-end inital context failed for JNDI name '" + jndiName + "':" + e, e);
455                            }
456                    }
457    
458                    if ("backEndConnection".equals(name)) {
459                            return backEndConnection;
460                    }
461    
462                    if ("backEndConnectionFactory".equals(name)) {
463                            return backEndConnectionFactory;
464                    }
465    
466                    return super.getChild(name);
467            }
468    
469    }