001 /*
002 * Copyright (C) The Apache Software Foundation. All rights reserved.
003 *
004 * This software is published under the terms of the Apache Software License
005 * version 1.1, a copy of which has been included with this distribution in
006 * the LICENSE file.
007 *
008 * $Id: XAConsumerThread.java 155459 2005-02-26 13:24:44Z dirkv $
009 */
010 package org.apache.commons.messagelet;
011
012 import javax.jms.MessageListener;
013 import javax.transaction.SystemException;
014 import javax.transaction.Transaction;
015 import javax.transaction.TransactionManager;
016 import javax.transaction.xa.XAResource;
017
018 import org.apache.commons.logging.Log;
019 import org.apache.commons.logging.LogFactory;
020 import org.apache.commons.messenger.Messenger;
021 import org.apache.commons.messenger.XACapable;
022 import org.apache.commons.messenger.XACapableAdapter;
023
024 /**
025 * <p><code>XAConsumerThread</code> is a thread which will perform XA processing
026 * of messages
027 *
028 * @author damon.hamacha
029 * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
030 * @version $Revision: 155459 $
031 */
032 public class XAConsumerThread extends ConsumerThread {
033
034 /** Logger */
035 private static final Log log = LogFactory.getLog(XAConsumerThread.class);
036
037 private TransactionManager transctionManager;
038 private Transaction transaction;
039
040 public XAConsumerThread() {
041 setName("XAConsumer" + getName());
042 }
043
044 /**
045 * @return the TransactionManager to be used
046 * @throws SystemException
047 */
048 public TransactionManager getTransactionManager() throws SystemException {
049 if (transctionManager == null) {
050 transctionManager = createTransactionManager();
051 }
052 return transctionManager;
053 }
054
055 /**
056 * Sets the transaction manager to be used
057 *
058 * @param transctionManager the transaction manager to be used
059 */
060 public void setTransactionManager(TransactionManager transctionManager) {
061 this.transctionManager = transctionManager;
062 }
063
064 // Implementation methods
065 //-------------------------------------------------------------------------
066
067 /**
068 * Factory method to create a TransactionManager via some mechanism.
069 * By default this mechanism will lookup in JNDI
070 */
071 protected TransactionManager createTransactionManager() throws SystemException {
072 return null;
073 }
074
075
076 /**
077 * Enlists any resources with the current transaction.
078 * Typically the input Messenger's Session will always be
079 * enlisted. Then if the current MessageListener implements XACapable
080 * then any of its resources will also be enlisted.
081 *
082 * @param transaction the transaction to enlist resources with
083 * @throws Exception if the enlistment fails for whatever reason
084 */
085 protected void enlist(Transaction transaction) throws Exception {
086 XACapable xaCapable = getXACapable( getMessenger() );
087 xaCapable.enlistResources(transaction);
088
089 MessageListener listener = getListener();
090 if (listener instanceof XACapable) {
091 xaCapable = (XACapable) listener;
092 xaCapable.enlistResources(transaction);
093 }
094
095 if (listener instanceof BridgeMDO) {
096 BridgeMDO bridge = (BridgeMDO) listener;
097 xaCapable = getXACapable( bridge.getOutputMessenger() );
098 xaCapable.enlistResources(transaction);
099 }
100 }
101
102
103
104 /**
105 * Delists any resources from the current transaction.
106 * This includes the current input Messenger's Session as well
107 * as any resources used by the MessageListener if it implements
108 * XACapable
109 *
110 * @param transaction
111 * @param flag is the flag used by JTA when delisting resources.
112 * It is either XAResource.TMSUCCESS, XAResource.TMSUSPEND, or XAResource.TMFAIL
113 * @throws Exception
114 */
115 protected void delist(Transaction transaction, int flag) throws Exception {
116 XACapable xaCapable = getXACapable( getMessenger() );
117 xaCapable.delistResources(transaction, flag);
118
119 MessageListener listener = getListener();
120 if (listener instanceof XACapable) {
121 xaCapable = (XACapable) listener;
122 xaCapable.delistResources(transaction, flag);
123 }
124
125 if (listener instanceof BridgeMDO) {
126 BridgeMDO bridge = (BridgeMDO) listener;
127 xaCapable = getXACapable( bridge.getOutputMessenger() );
128 xaCapable.delistResources(transaction, flag);
129 }
130 }
131
132 /**
133 * Strategy method to represent the code required to start
134 * a transaction.
135 */
136 protected void startTransaction() throws Exception {
137 getTransactionManager().begin();
138 transaction = getTransactionManager().getTransaction();
139
140 enlist(transaction);
141 }
142
143 /**
144 * Strategy method to represent the code required to commit
145 * a transaction.
146 */
147 protected void commitTransaction() throws Exception {
148 delist(transaction, XAResource.TMSUCCESS);
149 try {
150 transaction.commit();
151 }
152 catch (Exception e) {
153 log.error("Caught exception while committing txn: " + e, e);
154 transaction.setRollbackOnly();
155 throw e;
156 }
157 }
158
159 /**
160 * Strategy method to represent the code required to rollback
161 * a transaction.
162 */
163 protected void rollbackTransaction() throws Exception {
164 delist(transaction, XAResource.TMFAIL);
165 transaction.rollback();
166 }
167
168 /**
169 * Strategy method to represent the code required to cancel
170 * a transaction.
171 * This is called when a message is not received.
172 */
173 protected void cancelTransaction() throws Exception {
174 delist(transaction, XAResource.TMFAIL);
175 transaction.rollback();
176 }
177
178 /**
179 * @return an XACapable for the given Messenger
180 */
181 protected XACapable getXACapable(Messenger messenger) {
182 if (messenger instanceof XACapable) {
183 return (XACapable) messenger;
184 }
185 return new XACapableAdapter(messenger);
186 }
187 }