001    /*
002     * Copyright (C) The Apache Software Foundation. All rights reserved.
003     *
004     * This software is published under the terms of the Apache Software License
005     * version 1.1, a copy of which has been included with this distribution in
006     * the LICENSE file.
007     * 
008     * $Id: XAConsumerThread.java 155459 2005-02-26 13:24:44Z dirkv $
009     */
010    package org.apache.commons.messagelet;
011    
012    import javax.jms.MessageListener;
013    import javax.transaction.SystemException;
014    import javax.transaction.Transaction;
015    import javax.transaction.TransactionManager;
016    import javax.transaction.xa.XAResource;
017    
018    import org.apache.commons.logging.Log;
019    import org.apache.commons.logging.LogFactory;
020    import org.apache.commons.messenger.Messenger;
021    import org.apache.commons.messenger.XACapable;
022    import org.apache.commons.messenger.XACapableAdapter;
023    
024    /** 
025     * <p><code>XAConsumerThread</code> is a thread which will perform XA processing
026     * of messages
027     *
028     * @author damon.hamacha
029     * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
030     * @version $Revision: 155459 $
031     */
032    public class XAConsumerThread extends ConsumerThread {
033    
034            /** Logger */
035            private static final Log log = LogFactory.getLog(XAConsumerThread.class);
036    
037            private TransactionManager transctionManager;
038            private Transaction transaction;
039    
040            public XAConsumerThread() {
041                    setName("XAConsumer" + getName());
042            }
043    
044            /**
045             * @return the TransactionManager to be used
046             * @throws SystemException
047             */
048            public TransactionManager getTransactionManager() throws SystemException {
049                    if (transctionManager == null) {
050                            transctionManager = createTransactionManager();
051                    }
052                    return transctionManager;
053            }
054    
055            /**
056             * Sets the transaction manager to be used
057             * 
058             * @param transctionManager the transaction manager to be used
059             */
060            public void setTransactionManager(TransactionManager transctionManager) {
061                    this.transctionManager = transctionManager;
062            }
063    
064            // Implementation methods
065        //-------------------------------------------------------------------------    
066            
067            /**
068             * Factory method to create a TransactionManager via some mechanism.
069             * By default this mechanism will lookup in JNDI 
070             */
071            protected TransactionManager createTransactionManager() throws SystemException {
072                    return null;
073            }
074            
075                            
076            /**
077             * Enlists any resources with the current transaction.
078             * Typically the input Messenger's Session will always be
079             * enlisted. Then if the current MessageListener implements XACapable
080             * then any of its resources will also be enlisted.
081             * 
082             * @param transaction the transaction to enlist resources with
083             * @throws Exception if the enlistment fails for whatever reason
084             */
085            protected void enlist(Transaction transaction) throws Exception {
086                    XACapable xaCapable = getXACapable( getMessenger() );
087                    xaCapable.enlistResources(transaction);
088            
089                    MessageListener listener = getListener();
090                    if (listener instanceof XACapable) {
091                            xaCapable = (XACapable) listener;
092                            xaCapable.enlistResources(transaction);
093                    }
094                    
095                    if (listener instanceof BridgeMDO) {
096                            BridgeMDO bridge = (BridgeMDO) listener;
097                            xaCapable = getXACapable( bridge.getOutputMessenger() );
098                            xaCapable.enlistResources(transaction);
099                    }
100            }
101                    
102    
103    
104            /**
105             * Delists any resources from the current transaction.
106             * This includes the current input Messenger's Session as well
107             * as any resources used by the MessageListener if it implements
108             * XACapable
109             * 
110             * @param transaction
111             * @param flag is the flag used by JTA when delisting resources.
112             * It is either XAResource.TMSUCCESS, XAResource.TMSUSPEND, or XAResource.TMFAIL
113             * @throws Exception
114             */
115            protected void delist(Transaction transaction, int flag) throws Exception {
116                    XACapable xaCapable = getXACapable( getMessenger() );
117                    xaCapable.delistResources(transaction, flag);
118                    
119                    MessageListener listener = getListener();
120                    if (listener instanceof XACapable) {
121                            xaCapable = (XACapable) listener;
122                            xaCapable.delistResources(transaction, flag);
123                    }
124    
125                    if (listener instanceof BridgeMDO) {
126                            BridgeMDO bridge = (BridgeMDO) listener;
127                            xaCapable = getXACapable( bridge.getOutputMessenger() );
128                            xaCapable.delistResources(transaction, flag);
129                    }
130            }
131    
132            /**
133             * Strategy method to represent the code required to start
134             * a transaction.
135             */
136            protected void startTransaction() throws Exception {
137                    getTransactionManager().begin();
138                    transaction = getTransactionManager().getTransaction();
139    
140                    enlist(transaction);
141            }
142    
143            /**
144             * Strategy method to represent the code required to commit
145             * a transaction.
146             */
147            protected void commitTransaction() throws Exception {
148                    delist(transaction, XAResource.TMSUCCESS);
149                    try {
150                            transaction.commit();
151                    }
152                    catch (Exception e) {
153                            log.error("Caught exception while committing txn: " + e, e);
154                            transaction.setRollbackOnly();
155                            throw e;                
156                    }
157            }
158    
159            /**
160             * Strategy method to represent the code required to rollback
161             * a transaction.
162             */
163            protected void rollbackTransaction() throws Exception {
164                    delist(transaction, XAResource.TMFAIL);
165                    transaction.rollback();
166            }
167    
168            /**
169             * Strategy method to represent the code required to cancel
170             * a transaction. 
171             * This is called when a message is not received.
172             */
173            protected void cancelTransaction() throws Exception {
174                    delist(transaction, XAResource.TMFAIL);
175                    transaction.rollback();
176            }
177    
178            /**
179             * @return an XACapable for the given Messenger
180             */
181            protected XACapable getXACapable(Messenger messenger) {         
182                    if (messenger instanceof XACapable) {
183                            return (XACapable) messenger;
184                    }
185                    return new XACapableAdapter(messenger);
186            }
187    }