View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.commons.pipeline.stage;
19  
20  import java.io.File;
21  import java.io.FileOutputStream;
22  import java.io.IOException;
23  import java.io.OutputStream;
24  import java.util.Calendar;
25  import java.util.Date;
26  import java.util.HashSet;
27  import java.util.regex.Pattern;
28  import java.util.Set;
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.commons.net.ftp.FTPClient;
32  import org.apache.commons.net.ftp.FTPFile;
33  import org.apache.commons.net.ftp.FTPReply;
34  import org.apache.commons.pipeline.StageException;
35  
36  /**
37   * <p>This {@link org.apache.commons.pipeline.Pipeline$Stage Stage} provides the
38   * functionality needed to retrieve data from an FTP URL. Multipart responses
39   * are not yet supported.</p>
40   */
41  public class FtpFileDownloadStage extends BaseStage {
42      private final Log log = LogFactory.getLog(FtpFileDownloadStage.class);
43      
44      private String workDir = "/tmp";
45      private File fworkDir;
46      private FTPClient client = new FTPClient();
47      
48      /** Holds value of property host. */
49      private String host;
50      
51      /** Holds value of property user. */
52      private String user;
53      
54      /** Holds value of property password. */
55      private String password;
56      
57      /** Holds value of property port.     */
58      private int port;
59  
60      /**
61       * Default constructor - creates work directory in /tmp
62       */
63      public FtpFileDownloadStage() {
64      }
65      
66      /**
67       * Constructor specifying work directory.
68       * @param workDir local directory in which to store downloaded files
69       */
70      public FtpFileDownloadStage(String workDir) {
71          this.workDir = workDir;
72      }
73      
74      /**
75       * Creates the download directory {@link #setWorkDir(String) workDir} uf it does
76       * not exist and makes a connection to the remote FTP server.
77       * @throws org.apache.commons.pipeline.StageException if a connection to the remote FTP server cannot be established, or the login to
78       * the remote system fails
79       */
80      public void preprocess() throws StageException {
81          super.preprocess();
82          if (fworkDir == null) fworkDir = new File(workDir);
83          if (!this.fworkDir.exists()) fworkDir.mkdirs();
84          
85          try {
86              //connect to the ftp site
87              client.connect(host, port);
88              log.debug(client.getReplyString());
89              if(!FTPReply.isPositiveCompletion(client.getReplyCode())) {
90                  throw new IOException("FTP server at host " + host + " refused connection.");
91              }
92              
93              client.login(user, password);
94              log.debug(client.getReplyString());
95              if(!FTPReply.isPositiveCompletion(client.getReplyCode())) {
96                  throw new StageException(this, "FTP login failed for user " + user + ": " + client.getReplyString());
97              }
98          } catch (IOException e) {
99              throw new StageException(this, e);
100         }
101     }
102     
103     /**
104      * Retrieves files that match the specified FileSpec from the FTP server
105      * and stores them in the work directory.
106      * @param obj incoming {@link FileSpec} that indicates the file to download along with some flags to
107      * control the download behavior
108      * @throws org.apache.commons.pipeline.StageException if there are errors navigating the remote directory structure or file download 
109      * fails
110      */
111     public void process(Object obj) throws StageException {
112         if (!this.fworkDir.exists()) throw new StageException(this, "The work directory for file download " + workDir.toString() + " does not exist.");
113         
114         FileSpec spec = (FileSpec) obj;
115         
116         try {
117             client.setFileType(spec.type.intValue());
118             client.changeWorkingDirectory(spec.path);
119             if(!FTPReply.isPositiveCompletion(client.getReplyCode())) {
120                 throw new IOException("FTP client could not change to remote directory " + spec.path + ": " + client.getReplyString());
121             }
122             
123             log.debug("FTP connection successfully established to " + host + ":" + spec.path);
124             
125             //get the list of files
126             client.enterLocalPassiveMode();
127             searchCurrentDirectory("", spec);
128         } catch (IOException e) {
129             throw new StageException(this, e);
130         }
131     }
132     
133     
134     /**
135      * Search the current working directory of the FTP client, saving files
136      * to the path specified by workDir + the path to the file on the FTP server.
137      * This method will optionally recursively search directories on the remote server.
138      */
139     private void searchCurrentDirectory(String path, FileSpec spec) throws IOException {
140         FTPFile[] files = client.listFiles();
141         if(!FTPReply.isPositiveCompletion(client.getReplyCode())) {
142             throw new IOException("FTP client could not obtain file list : " + client.getReplyString());
143         }
144         
145         search: for (FTPFile file : files) {
146             String localPath = path + File.separatorChar + file.getName();
147             
148             if (file.isDirectory() && spec.recursive) {
149                 log.debug("Recursing into directory " + file.getName());
150                 client.changeWorkingDirectory(file.getName());
151                 searchCurrentDirectory(localPath, spec);
152                 client.changeToParentDirectory();
153             } else {
154                 log.debug("Examining file " + localPath);
155                 for (Criterion crit : spec.criteria) {
156                     if (!crit.matches(file)) {
157                         log.info("File " + localPath + " failed criterion check " + crit);
158                         continue search;
159                     }
160                 }
161                 
162                 boolean getFile = true;
163                 File localFile = new File(workDir + File.separatorChar + localPath);
164                 if (localFile.exists()) {
165                     if (spec.overwrite) {
166                         log.info("Replacing existing local file " + localFile.getPath());
167                         getFile = true;
168                     } else {
169                         if (spec.ignoreExisting) {
170                             log.info("Ignoring existing local file " + localFile.getPath());
171                             continue search;
172                 } else {
173                             log.info("Using existing local file " + localFile.getPath());
174                             getFile = false;
175                         }
176                     }
177                 } else {
178                     getFile = true;
179                 }
180                 
181                 if (getFile) {
182                     if (! localFile.getParentFile().exists()) localFile.getParentFile().mkdir();
183                     
184                     OutputStream out = new FileOutputStream(localFile);
185                     try {
186                         client.retrieveFile(file.getName(), out);
187                     } finally {
188                         out.flush();
189                         out.close();
190                     }
191                 }
192                 
193                 this.emit(localFile);
194             }
195         }
196     }
197     
198     /**
199      * Disconnects from FTP server. Errors are logged.
200      */
201     public void release() {
202         try {
203             client.disconnect(); //close ftp connection
204         } catch (IOException e) {
205             log.error(e.getMessage(), e);
206         }
207     }
208     
209     /**
210      * Sets the working directory for the file download. If the directory does
211      * not already exist, it will be created during the preprocess() step.
212      * @param workDir local directory to receive file downloads
213      */
214     public void setWorkDir(String workDir) {
215         this.workDir = workDir;
216     }
217     
218     /**
219      * Returns the name of the file download directory.
220      * @return the string containing the local working directory
221      */
222     public String getWorkDir() {
223         return this.workDir;
224     }
225     
226     /** Getter for property host.
227      * @return Value of property host.
228      *
229      */
230     public String getHost() {
231         return this.host;
232     }
233     
234     /** Setter for property host.
235      * @param host New value of property host.
236      *
237      */
238     public void setHost(String host) {
239         this.host = host;
240     }
241     
242     /** Getter for property user.
243      * @return Value of property user.
244      *
245      */
246     public String getUser() {
247         return this.user;
248     }
249     
250     /** Setter for property user.
251      * @param user New value of property user.
252      *
253      */
254     public void setUser(String user) {
255         this.user = user;
256     }
257     
258     /** Setter for property password.
259      * @param password New value of property password.
260      *
261      */
262     public void setPassword(String password) {
263         this.password = password;
264     }
265     
266     /**
267      * Getter for property port.
268      * @return Value of property port.
269      */
270     public int getPort() {
271         return this.port;
272     }
273 
274     /**
275      * Setter for property port.
276      * @param port New value of property port.
277      */
278     public void setPort(int port) {
279         this.port = port;
280     }
281     
282     /**
283      * This class is used to specify a path and pattern of file for the FtpFileDownload
284      * to retrieve. There are some parameters that can be configured in the filespec
285      * that will control download behavior for <CODE>recursive</CODE> searching, the 
286      * <CODE>overwrite</CODE> of locally existing files, and to 
287      * <CODE>ignoreExisting</CODE> files.
288      * <p>
289      * If a file already exists in the local directory, it is only replaced if 
290      * <CODE>overwrite</CODE> is set to <CODE>true</CODE>. If it is replaced, then the
291      * filename is passed on to the next stage. Existing files are passed on to the
292      * stage unless <CODE>ignoreExisting</CODE> is <CODE>true</CODE>. Note that the
293      * <CODE>ignoreExisting</CODE> flag is only used if <CODE>overwrite</CODE> is 
294      * <CODE>false</CODE> (it's assumed that if a file will be downloaded, then it 
295      * shouldn't be ignored).
296      * <p>
297      * Pseudocode to summarize the interaction of <CODE>overwrite</CODE> and 
298      * <CODE>ignoreExisting</CODE>: <PRE>
299      *     if (file exists) {
300      *        if (overwrite) {
301      *            download file over existing local copy
302      *            and pass it on to the next stage
303      *        } else {
304      *            if (ignoreExisting) {
305      *                skip this file
306      *            } else {
307      *                pass existing file on to the next stage
308      *            }
309      *        }
310      *     } else {
311      *        download new file 
312      *        and pass it on to the next stage
313      *     }
314      * </PRE>
315      */
316     public static class FileSpec {
317         /**
318          * Enumeration of legal FTP file tranfer types
319          */
320         public enum FileType {
321             /**
322              * ASCII text transfer mode, with end of line conversion.
323              */
324             ASCII(FTPClient.ASCII_FILE_TYPE),
325             /**
326              * Binary transfer mode, no changes made to data stream.
327              */
328                     BINARY(FTPClient.BINARY_FILE_TYPE);
329             
330             private int type;
331             
332             private FileType(int type) {
333                 this.type = type;
334             }
335             
336             /**
337              * Get the integer value of the FTP transfer mode enumeration.
338              * @return the integer equivalent to the FTP transfer mode setting
339              */
340             public int intValue() {
341                 return this.type;
342             }
343         }
344         
345         /** Holds value of property path. */
346         private String path = "/";
347         
348         /** Holds flag that determines whether or not to perform recursive search of the specified path */
349         private boolean recursive;
350         
351         // Holds flag that determines whether or not to overwrite local files
352         private boolean overwrite = false;
353 
354         /**
355          * Holds flag that determines if existing files are passed to the next stage.
356          */
357         private boolean ignoreExisting = false;
358         
359         // Type of file (ascii or binary)
360         private FileType type = FileType.BINARY;
361         
362         // List of criteria that the retrieved file must satisfy.
363         private Set<Criterion> criteria = new HashSet<Criterion>();
364         
365         /** Getter for property path.
366          * @return Value of property path.
367          *
368          */
369         public String getPath() {
370             return this.path;
371         }
372         
373         /** Setter for property path.
374          * @param path New value of property path.
375          *
376          */
377         public void setPath(String path) {
378             this.path = path;
379         }
380         
381         /** Getter for property pattern.
382          * @return Value of property pattern.
383          * @deprecated - not retrievable from criterion
384          */
385         public String getPattern() {
386             return null;
387         }
388         
389         /** Setter for property pattern.
390          * @param pattern New value of property pattern.
391          *
392          */
393         public void setPattern(String pattern) {
394             this.criteria.add(new FileNameMatchCriterion(pattern));
395         }
396         
397         /**
398          * Add a criterion to the set of criteria that must be matched for files
399          * to be downloaded
400          * @param crit {@link Criterion} used to match desired files for download, typically a filename pattern
401          */
402         public void addCriterion(Criterion crit) {
403             this.criteria.add(crit);
404         }
405         
406         /**
407          * Sets the flag determining whether or not the stage will recursively
408          * traverse the directory tree to find files.
409          * @param recursive this value is <CODE>true</CODE> to recursively search the remote directories for matches to
410          * the criterion, <CODE>false</CODE> to turn off recursive searching
411          */
412         public void setRecursive(boolean recursive) {
413             this.recursive = recursive;
414         }
415         
416         /**
417          * Returns whether or not the stage will recursively
418          * traverse the directory tree to find files.
419          * @return the current recursive search setting
420          */
421         public boolean isRecursive() {
422             return this.recursive;
423         }
424         
425         /**
426          * Sets the file type for the transfer. Legal values are "ascii" and "binary".
427          * Binary transfers are the default.
428          * @param fileType the FTP transfer type to use, "<CODE>ascii</CODE>" or "<CODE>binary</CODE>"
429          */
430         public void setFileType(String fileType) {
431             if ("ascii".equalsIgnoreCase(fileType)) {
432                 this.type = FileType.ASCII;
433             } else {
434                 this.type = FileType.BINARY;
435             }
436         }
437         
438         /**
439          * Returns the file type for the transfer.
440          * @return the current FTP transfer type setting
441          */
442         public String getFileType() {
443             return this.type.toString();
444         }
445 
446         /**
447          * Getter for property overwrite. The default value for this flag is 
448          * <CODE>false</CODE>, so existing local files will not be replaced by downloading
449          * remote files. This flag should be set to <CODE>true</CODE> if it is expected
450          * that the remote file is periodically updated and the local file is and out of
451          * date copy from a previous run of this pipeline.
452          * @return Value of property overwrite.
453          */
454         public boolean isOverwrite() {
455             return this.overwrite;
456         }
457 
458         /**
459          * Setter for property overwrite.
460          * @param overwrite New value of property overwrite.
461          */
462         public void setOverwrite(boolean overwrite) {
463             this.overwrite = overwrite;
464         }
465 
466         /**
467          * Getter for property ignoreExisting. The default value for this flag is 
468          * <CODE>false</CODE>, so existing files that aren't downloaded are still passed
469          * on to the next stage.
470          * @return Value of property ignoreExisting.
471          */
472         public boolean isIgnoreExisting() {
473             return this.ignoreExisting;
474         }
475 
476         /**
477          * Setter for property ignoreExisting.
478          * @param ignoreExisting New value of property ignoreExisting.
479          */
480         public void setIgnoreExisting(boolean ignoreExisting) {
481             this.ignoreExisting = ignoreExisting;
482         }
483     }
484     
485     /**
486      * This class is used to specify a criterion that the downloaded file
487      * must satisfy.
488      */
489     public interface Criterion {
490         /**
491          * Interface defining matches for FTP file downloading. Those remote files that
492          * match the criterion will be downloaded.
493          * @param file file to compare criterion to
494          * @return <CODE>true</CODE> if the file meets the Criterion, <CODE>false</CODE> otherwise
495          */
496         public boolean matches(FTPFile file);
497     }
498     
499     /**
500      * Matches file names based upon the Java regex supplied in the constructor.
501      */
502     public static class FileNameMatchCriterion implements Criterion {
503         // precompiled pattern used to match filenames
504         private Pattern pattern;
505         private String _pattern;
506         
507         /**
508          * Construct a new criterion to match on file names.
509          * @param pattern Java regex pattern specifying acceptable file names
510          */
511         public FileNameMatchCriterion(String pattern) {
512             this._pattern = pattern;
513             this.pattern = Pattern.compile(pattern);
514         }
515         
516         /**
517          * Test the given file's name against this criterion.
518          * @param file file to compare to
519          * @return <CODE>true</CODE> if the filename matches the filename pattern of this criterion,
520          * <CODE>false</CODE> otherwise
521          */
522         public boolean matches(FTPFile file) {
523             return pattern.matcher(file.getName()).matches();
524         }
525         
526         /**
527          * Printable version of this Criterion indicating the Java regex used for filename
528          * matching.
529          * @return a string containing the regex used to construct this filename criterion
530          */
531         public String toString() {
532             return "filename matches pattern " + _pattern;
533         }
534     }
535     
536     /**
537      * Matches files by matching their filesystem timestamp to a date range.
538      */
539     public static class FileDateMatchCriterion implements Criterion {
540         private Date startDate;
541         private Date endDate;
542         
543         /**
544          * Construct a new criterion to match file timestamp to a range of dates.
545          * @param startDate starting date (inclusive) of the date range
546          * @param endDate ending date (inclusive) of the date range
547          */
548         public FileDateMatchCriterion(Date startDate, Date endDate) {
549             this.startDate = startDate;
550             this.endDate = endDate;
551         }
552         
553         /**
554          * Test the given file's date against this criterion.
555          * @param file file to compare to
556          * @return <CODE>true</CODE> if the file date falls into the time window of 
557          * [startDate, endDate], <CODE>false</CODE> otherwise
558          */
559         public boolean matches(FTPFile file) {
560             Calendar cal = file.getTimestamp();
561             if ((startDate != null && cal.getTime().before(startDate)) || (endDate != null && cal.getTime().after(endDate))) {
562                 return false;
563             } else {
564                 return true;
565             }
566         }
567         
568         /**
569          * Printable version of this Criterion indicating the inclusive date range used
570          * for file date matching.
571          * @return a string noting the startDate and endDate
572          */
573         public String toString() {
574             return "file date is between " + startDate + " and " + endDate;
575         }
576     }
577 }