001 /*
002 * Copyright (C) The Apache Software Foundation. All rights reserved.
003 *
004 * This software is published under the terms of the Apache Software License
005 * version 1.1, a copy of which has been included with this distribution in
006 * the LICENSE file.
007 *
008 * $Id: ConsumerTask.java 155459 2005-02-26 13:24:44Z dirkv $
009 */
010 package org.apache.commons.messenger.task;
011
012 import java.io.File;
013 import java.io.FileWriter;
014 import java.io.IOException;
015
016 import javax.jms.Destination;
017 import javax.jms.JMSException;
018 import javax.jms.Message;
019 import javax.jms.TextMessage;
020
021 import org.apache.commons.messenger.Messenger;
022 import org.apache.commons.messenger.MessengerManager;
023 import org.apache.tools.ant.BuildException;
024 import org.apache.tools.ant.Project;
025 import org.apache.tools.ant.Task;
026
027 /**
028 * <p><code>ConsumerTask</code> is an Ant task which will
029 * publish all of the given text files as a JMS Text Message
030 * using a given JMS Connection (Messenger) and a Destination
031 *
032 * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
033 * @version $Revision: 155459 $
034 */
035 public class ConsumerTask extends Task {
036
037 private Messenger messenger;
038 private String messengerName;
039 private Destination destination;
040 private String subject;
041 private MessengerManager messengerManager;
042
043 /** the number of messages to receive */
044 private int count;
045
046 /** the output directory */
047 private File dir = new File(".");
048
049 // Properties
050 //-------------------------------------------------------------------------
051
052
053 /**
054 * Sets the output directory
055 */
056 public void setDir(File dir) {
057 this.dir = dir;
058 }
059
060 public Messenger getMessenger() throws JMSException {
061 if ( messenger == null ) {
062 messenger = getMessengerManager().getMessenger( getMessengerName() );
063 }
064 return messenger;
065 }
066
067 /** Sets the Messenger to be used */
068 public void setMessenger(Messenger messenger) {
069 this.messenger = messenger;
070 }
071
072 /** Getter for property messengerName.
073 * @return Value of property messengerName.
074 */
075 public String getMessengerName() {
076 return messengerName;
077 }
078
079 /** Setter for property messengerName.
080 * @param messengerName New value of property messengerName.
081 */
082 public void setMessengerName(String messengerName) {
083 this.messengerName = messengerName;
084 }
085
086 /** Getter for property destination.
087 * @return Value of property destination.
088 */
089 public Destination getDestination() throws JMSException {
090 if ( destination == null ) {
091 destination = getMessenger().getDestination( getSubject() );
092 }
093 return destination;
094 }
095
096 /** Setter for property destination.
097 * @param destination New value of property destination.
098 */
099 public void setDestination(Destination destination) {
100 this.destination = destination;
101 }
102
103 /** Getter for property subject.
104 * @return Value of property subject.
105 */
106 public String getSubject() {
107 return subject;
108 }
109
110 /** Setter for property subject.
111 * @param subject New value of property subject.
112 */
113 public void setSubject(String subject) {
114 this.subject = subject;
115 }
116
117
118 /** Getter for property messengerManager.
119 * @return Value of property messengerManager.
120 */
121 public MessengerManager getMessengerManager() {
122 return messengerManager;
123 }
124
125 /** Setter for property messengerManager.
126 * @param messengerManager New value of property messengerManager.
127 */
128 public void setMessengerManager(MessengerManager messengerManager) {
129 this.messengerManager = messengerManager;
130 }
131
132 /**
133 * Sets the URI of the Messenger.xml configuration document to use
134 * to configure the messengers to use for this task.
135 */
136 public void setConfiguration(String uri) throws JMSException {
137 setMessengerManager( MessengerManager.load( uri ) );
138 }
139
140 /**
141 * @return the number of messages to receive.
142 * A number less than or equal to 0 will receive messages forever
143 */
144 public int getCount() {
145 return count;
146 }
147
148 /**
149 * Setter for the number of messages to receive.
150 * A number less than or equal to 0 will receive messages forever
151 */
152 public void setCount(int count) {
153 this.count = count;
154 }
155
156
157 // Task interface
158 //-------------------------------------------------------------------------
159
160 /**
161 * Performs the copy operation.
162 */
163 public void execute() throws BuildException {
164 try {
165 Messenger messenger = getMessenger();
166 if ( messenger == null ) {
167 throw new BuildException("Must specify a valid Messenger", location );
168 }
169 Destination destination = getDestination();
170 if ( destination == null ) {
171 throw new BuildException("Must specify a valid JMS Destination", location );
172 }
173
174 if ( count > 0 ) {
175 log( "Will wait until I receive: " + count + " messages and will write to directory: " + dir );
176
177 for ( int i = 0; i < count; i++ ) {
178 Message message = messenger.receive( destination );
179 processMessage( message );
180 }
181
182 log( "Finished." );
183 }
184 else {
185 log( "Infinite loop. Will write to directory: " + dir );
186
187 while (true) {
188 Message message = messenger.receive( destination );
189 processMessage( message );
190 }
191 }
192 }
193 catch (IOException e) {
194 log( "Caught exception: " + e, Project.MSG_ERR );
195 throw new BuildException(e, location);
196 }
197 catch (JMSException e) {
198 log( "Caught exception: " + e, Project.MSG_ERR );
199 throw new BuildException(e, location);
200 }
201 finally {
202 try {
203 // close the JMS connection to release any background threads
204 messenger.close();
205 }
206 catch (Exception e) {
207 // ignore close exceptions
208 }
209 }
210 }
211
212 /**
213 * Processes a given message
214 */
215 protected void processMessage(Message message) throws IOException, JMSException {
216 log( "Received message to: " + message );
217
218 String text = null;
219 if ( message instanceof TextMessage ) {
220 TextMessage textMessage = (TextMessage) message;
221 text = textMessage.toString();
222 }
223 else {
224 // #### bit of a hack!!!
225 // ideally we need an XML format for message persistence
226 text = message.toString();
227 }
228 processMessageText(text);
229 }
230
231 /**
232 * Writes the given text to a file
233 */
234 protected void processMessageText(String text) throws IOException {
235 FileWriter writer = new FileWriter( dir );
236 writer.write ( text );
237 writer.close();
238 }
239 }
240
241