001    /*
002     * Copyright (C) The Apache Software Foundation. All rights reserved.
003     *
004     * This software is published under the terms of the Apache Software License
005     * version 1.1, a copy of which has been included with this distribution in
006     * the LICENSE file.
007     * 
008     * $Id: ConsumerTask.java 155459 2005-02-26 13:24:44Z dirkv $
009     */
010    package org.apache.commons.messenger.task;
011    
012    import java.io.File;
013    import java.io.FileWriter;
014    import java.io.IOException;
015    
016    import javax.jms.Destination;
017    import javax.jms.JMSException;
018    import javax.jms.Message;
019    import javax.jms.TextMessage;
020    
021    import org.apache.commons.messenger.Messenger;
022    import org.apache.commons.messenger.MessengerManager;
023    import org.apache.tools.ant.BuildException;
024    import org.apache.tools.ant.Project;
025    import org.apache.tools.ant.Task;
026    
027    /** 
028     * <p><code>ConsumerTask</code> is an Ant task which will 
029     * publish all of the given text files as a JMS Text Message
030     * using a given JMS Connection (Messenger) and a Destination
031     *
032     * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
033     * @version $Revision: 155459 $
034     */
035    public class ConsumerTask extends Task {
036    
037        private Messenger messenger;
038        private String messengerName;
039        private Destination destination;
040        private String subject;
041        private MessengerManager messengerManager;    
042    
043        /** the number of messages to receive */
044        private int count;
045    
046        /** the output directory */
047        private File dir = new File(".");    
048        
049        // Properties
050        //-------------------------------------------------------------------------
051        
052    
053        /** 
054         * Sets the output directory 
055         */
056        public void setDir(File dir) {
057            this.dir = dir;
058        }
059    
060        public Messenger getMessenger() throws JMSException {
061            if ( messenger == null ) {
062                messenger = getMessengerManager().getMessenger( getMessengerName() );
063            }
064            return messenger;
065        }
066        
067        /** Sets the Messenger to be used */
068        public void setMessenger(Messenger messenger) {
069            this.messenger = messenger;
070        }
071        
072        /** Getter for property messengerName.
073         * @return Value of property messengerName.
074         */
075        public String getMessengerName() {
076            return messengerName;
077        }
078    
079        /** Setter for property messengerName.
080         * @param messengerName New value of property messengerName.
081         */
082        public void setMessengerName(String messengerName) {
083            this.messengerName = messengerName;
084        }
085    
086        /** Getter for property destination.
087         * @return Value of property destination.
088         */
089        public Destination getDestination() throws JMSException {
090            if ( destination == null ) {
091                destination = getMessenger().getDestination( getSubject() );
092            }
093            return destination;
094        }
095    
096        /** Setter for property destination.
097         * @param destination New value of property destination.
098         */
099        public void setDestination(Destination destination) {
100            this.destination = destination;
101        }
102    
103        /** Getter for property subject.
104         * @return Value of property subject.
105         */
106        public String getSubject() {
107            return subject;
108        }
109    
110        /** Setter for property subject.
111         * @param subject New value of property subject.
112         */
113        public void setSubject(String subject) {
114            this.subject = subject;
115        }
116        
117        
118        /** Getter for property messengerManager.
119         * @return Value of property messengerManager.
120         */
121        public MessengerManager getMessengerManager() {
122            return messengerManager;
123        }
124        
125        /** Setter for property messengerManager.
126         * @param messengerManager New value of property messengerManager.
127         */
128        public void setMessengerManager(MessengerManager messengerManager) {
129            this.messengerManager = messengerManager;
130        }
131    
132        /** 
133         * Sets the URI of the Messenger.xml configuration document to use
134         * to configure the messengers to use for this task.
135         */
136        public void setConfiguration(String uri) throws JMSException {
137            setMessengerManager( MessengerManager.load( uri ) );
138        }
139        
140        /** 
141         * @return the number of messages to receive.
142         * A number less than or equal to 0 will receive messages forever
143         */
144        public int getCount() {
145            return count;
146        }
147        
148        /** 
149         * Setter for the number of messages to receive.
150         * A number less than or equal to 0 will receive messages forever
151         */
152        public void setCount(int count) {
153            this.count = count;
154        }
155        
156        
157        // Task interface
158        //-------------------------------------------------------------------------
159        
160        /**
161         * Performs the copy operation.
162         */
163        public void execute() throws BuildException {
164            try {
165                Messenger messenger = getMessenger();
166                if ( messenger == null ) {
167                    throw new BuildException("Must specify a valid Messenger", location );
168                }
169                Destination destination = getDestination();
170                if ( destination == null ) {
171                    throw new BuildException("Must specify a valid JMS Destination", location );
172                }
173    
174                if ( count > 0 ) {
175                    log( "Will wait until I receive: " + count + " messages and will write to directory: " + dir );
176                    
177                    for ( int i = 0; i < count; i++ ) {
178                        Message message = messenger.receive( destination );
179                        processMessage( message );
180                    }
181                    
182                    log( "Finished." );
183                }
184                else {
185                    log( "Infinite loop. Will write to directory: " + dir );
186                    
187                    while (true) {
188                        Message message = messenger.receive( destination );
189                        processMessage( message );
190                    }
191                }
192            }
193            catch (IOException e) {
194                log( "Caught exception: " + e, Project.MSG_ERR );
195                throw new BuildException(e, location);
196            }
197            catch (JMSException e) {
198                log( "Caught exception: " + e, Project.MSG_ERR );
199                throw new BuildException(e, location);
200            }
201            finally {        
202                try {
203                    // close the JMS connection to release any background threads        
204                    messenger.close();
205                }
206                catch (Exception e) {
207                    // ignore close exceptions
208                }
209            }
210        }
211    
212        /**
213         * Processes a given message
214         */
215        protected void processMessage(Message message) throws IOException, JMSException {
216            log( "Received message to: " + message );
217    
218            String text = null;
219            if ( message instanceof TextMessage ) {
220                TextMessage textMessage = (TextMessage) message;
221                text = textMessage.toString();
222            }
223            else {
224                // #### bit of a hack!!!
225                // ideally we need an XML format for message persistence
226                text = message.toString();
227            }       
228            processMessageText(text);
229        }
230        
231        /** 
232         * Writes the given text to a file
233         */
234        protected void processMessageText(String text) throws IOException {
235            FileWriter writer = new FileWriter( dir );
236            writer.write ( text );
237            writer.close();
238        }
239    }
240    
241