001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.commons.pipeline.stage;
019    
020    import java.io.File;
021    import java.io.FileOutputStream;
022    import java.io.IOException;
023    import java.io.OutputStream;
024    import java.util.Calendar;
025    import java.util.Date;
026    import java.util.HashSet;
027    import java.util.regex.Pattern;
028    import java.util.Set;
029    import org.apache.commons.logging.Log;
030    import org.apache.commons.logging.LogFactory;
031    import org.apache.commons.net.ftp.FTPClient;
032    import org.apache.commons.net.ftp.FTPFile;
033    import org.apache.commons.net.ftp.FTPReply;
034    import org.apache.commons.pipeline.StageException;
035    
036    /**
037     * <p>This {@link org.apache.commons.pipeline.Pipeline$Stage Stage} provides the
038     * functionality needed to retrieve data from an FTP URL. Multipart responses
039     * are not yet supported.</p>
040     */
041    public class FtpFileDownloadStage extends BaseStage {
042        private final Log log = LogFactory.getLog(FtpFileDownloadStage.class);
043        
044        private String workDir = "/tmp";
045        private File fworkDir;
046        private FTPClient client = new FTPClient();
047        
048        /** Holds value of property host. */
049        private String host;
050        
051        /** Holds value of property user. */
052        private String user;
053        
054        /** Holds value of property password. */
055        private String password;
056        
057        /** Holds value of property port.     */
058        private int port;
059    
060        /**
061         * Default constructor - creates work directory in /tmp
062         */
063        public FtpFileDownloadStage() {
064        }
065        
066        /**
067         * Constructor specifying work directory.
068         * @param workDir local directory in which to store downloaded files
069         */
070        public FtpFileDownloadStage(String workDir) {
071            this.workDir = workDir;
072        }
073        
074        /**
075         * Creates the download directory {@link #setWorkDir(String) workDir} uf it does
076         * not exist and makes a connection to the remote FTP server.
077         * @throws org.apache.commons.pipeline.StageException if a connection to the remote FTP server cannot be established, or the login to
078         * the remote system fails
079         */
080        public void preprocess() throws StageException {
081            super.preprocess();
082            if (fworkDir == null) fworkDir = new File(workDir);
083            if (!this.fworkDir.exists()) fworkDir.mkdirs();
084            
085            try {
086                //connect to the ftp site
087                client.connect(host, port);
088                log.debug(client.getReplyString());
089                if(!FTPReply.isPositiveCompletion(client.getReplyCode())) {
090                    throw new IOException("FTP server at host " + host + " refused connection.");
091                }
092                
093                client.login(user, password);
094                log.debug(client.getReplyString());
095                if(!FTPReply.isPositiveCompletion(client.getReplyCode())) {
096                    throw new StageException(this, "FTP login failed for user " + user + ": " + client.getReplyString());
097                }
098            } catch (IOException e) {
099                throw new StageException(this, e);
100            }
101        }
102        
103        /**
104         * Retrieves files that match the specified FileSpec from the FTP server
105         * and stores them in the work directory.
106         * @param obj incoming {@link FileSpec} that indicates the file to download along with some flags to
107         * control the download behavior
108         * @throws org.apache.commons.pipeline.StageException if there are errors navigating the remote directory structure or file download 
109         * fails
110         */
111        public void process(Object obj) throws StageException {
112            if (!this.fworkDir.exists()) throw new StageException(this, "The work directory for file download " + workDir.toString() + " does not exist.");
113            
114            FileSpec spec = (FileSpec) obj;
115            
116            try {
117                client.setFileType(spec.type.intValue());
118                client.changeWorkingDirectory(spec.path);
119                if(!FTPReply.isPositiveCompletion(client.getReplyCode())) {
120                    throw new IOException("FTP client could not change to remote directory " + spec.path + ": " + client.getReplyString());
121                }
122                
123                log.debug("FTP connection successfully established to " + host + ":" + spec.path);
124                
125                //get the list of files
126                client.enterLocalPassiveMode();
127                searchCurrentDirectory("", spec);
128            } catch (IOException e) {
129                throw new StageException(this, e);
130            }
131        }
132        
133        
134        /**
135         * Search the current working directory of the FTP client, saving files
136         * to the path specified by workDir + the path to the file on the FTP server.
137         * This method will optionally recursively search directories on the remote server.
138         */
139        private void searchCurrentDirectory(String path, FileSpec spec) throws IOException {
140            FTPFile[] files = client.listFiles();
141            if(!FTPReply.isPositiveCompletion(client.getReplyCode())) {
142                throw new IOException("FTP client could not obtain file list : " + client.getReplyString());
143            }
144            
145            search: for (FTPFile file : files) {
146                String localPath = path + File.separatorChar + file.getName();
147                
148                if (file.isDirectory() && spec.recursive) {
149                    log.debug("Recursing into directory " + file.getName());
150                    client.changeWorkingDirectory(file.getName());
151                    searchCurrentDirectory(localPath, spec);
152                    client.changeToParentDirectory();
153                } else {
154                    log.debug("Examining file " + localPath);
155                    for (Criterion crit : spec.criteria) {
156                        if (!crit.matches(file)) {
157                            log.info("File " + localPath + " failed criterion check " + crit);
158                            continue search;
159                        }
160                    }
161                    
162                    boolean getFile = true;
163                    File localFile = new File(workDir + File.separatorChar + localPath);
164                    if (localFile.exists()) {
165                        if (spec.overwrite) {
166                            log.info("Replacing existing local file " + localFile.getPath());
167                            getFile = true;
168                        } else {
169                            if (spec.ignoreExisting) {
170                                log.info("Ignoring existing local file " + localFile.getPath());
171                                continue search;
172                    } else {
173                                log.info("Using existing local file " + localFile.getPath());
174                                getFile = false;
175                            }
176                        }
177                    } else {
178                        getFile = true;
179                    }
180                    
181                    if (getFile) {
182                        if (! localFile.getParentFile().exists()) localFile.getParentFile().mkdir();
183                        
184                        OutputStream out = new FileOutputStream(localFile);
185                        try {
186                            client.retrieveFile(file.getName(), out);
187                        } finally {
188                            out.flush();
189                            out.close();
190                        }
191                    }
192                    
193                    this.emit(localFile);
194                }
195            }
196        }
197        
198        /**
199         * Disconnects from FTP server. Errors are logged.
200         */
201        public void release() {
202            try {
203                client.disconnect(); //close ftp connection
204            } catch (IOException e) {
205                log.error(e.getMessage(), e);
206            }
207        }
208        
209        /**
210         * Sets the working directory for the file download. If the directory does
211         * not already exist, it will be created during the preprocess() step.
212         * @param workDir local directory to receive file downloads
213         */
214        public void setWorkDir(String workDir) {
215            this.workDir = workDir;
216        }
217        
218        /**
219         * Returns the name of the file download directory.
220         * @return the string containing the local working directory
221         */
222        public String getWorkDir() {
223            return this.workDir;
224        }
225        
226        /** Getter for property host.
227         * @return Value of property host.
228         *
229         */
230        public String getHost() {
231            return this.host;
232        }
233        
234        /** Setter for property host.
235         * @param host New value of property host.
236         *
237         */
238        public void setHost(String host) {
239            this.host = host;
240        }
241        
242        /** Getter for property user.
243         * @return Value of property user.
244         *
245         */
246        public String getUser() {
247            return this.user;
248        }
249        
250        /** Setter for property user.
251         * @param user New value of property user.
252         *
253         */
254        public void setUser(String user) {
255            this.user = user;
256        }
257        
258        /** Setter for property password.
259         * @param password New value of property password.
260         *
261         */
262        public void setPassword(String password) {
263            this.password = password;
264        }
265        
266        /**
267         * Getter for property port.
268         * @return Value of property port.
269         */
270        public int getPort() {
271            return this.port;
272        }
273    
274        /**
275         * Setter for property port.
276         * @param port New value of property port.
277         */
278        public void setPort(int port) {
279            this.port = port;
280        }
281        
282        /**
283         * This class is used to specify a path and pattern of file for the FtpFileDownload
284         * to retrieve. There are some parameters that can be configured in the filespec
285         * that will control download behavior for <CODE>recursive</CODE> searching, the 
286         * <CODE>overwrite</CODE> of locally existing files, and to 
287         * <CODE>ignoreExisting</CODE> files.
288         * <p>
289         * If a file already exists in the local directory, it is only replaced if 
290         * <CODE>overwrite</CODE> is set to <CODE>true</CODE>. If it is replaced, then the
291         * filename is passed on to the next stage. Existing files are passed on to the
292         * stage unless <CODE>ignoreExisting</CODE> is <CODE>true</CODE>. Note that the
293         * <CODE>ignoreExisting</CODE> flag is only used if <CODE>overwrite</CODE> is 
294         * <CODE>false</CODE> (it's assumed that if a file will be downloaded, then it 
295         * shouldn't be ignored).
296         * <p>
297         * Pseudocode to summarize the interaction of <CODE>overwrite</CODE> and 
298         * <CODE>ignoreExisting</CODE>: <PRE>
299         *     if (file exists) {
300         *        if (overwrite) {
301         *            download file over existing local copy
302         *            and pass it on to the next stage
303         *        } else {
304         *            if (ignoreExisting) {
305         *                skip this file
306         *            } else {
307         *                pass existing file on to the next stage
308         *            }
309         *        }
310         *     } else {
311         *        download new file 
312         *        and pass it on to the next stage
313         *     }
314         * </PRE>
315         */
316        public static class FileSpec {
317            /**
318             * Enumeration of legal FTP file tranfer types
319             */
320            public enum FileType {
321                /**
322                 * ASCII text transfer mode, with end of line conversion.
323                 */
324                ASCII(FTPClient.ASCII_FILE_TYPE),
325                /**
326                 * Binary transfer mode, no changes made to data stream.
327                 */
328                        BINARY(FTPClient.BINARY_FILE_TYPE);
329                
330                private int type;
331                
332                private FileType(int type) {
333                    this.type = type;
334                }
335                
336                /**
337                 * Get the integer value of the FTP transfer mode enumeration.
338                 * @return the integer equivalent to the FTP transfer mode setting
339                 */
340                public int intValue() {
341                    return this.type;
342                }
343            }
344            
345            /** Holds value of property path. */
346            private String path = "/";
347            
348            /** Holds flag that determines whether or not to perform recursive search of the specified path */
349            private boolean recursive;
350            
351            // Holds flag that determines whether or not to overwrite local files
352            private boolean overwrite = false;
353    
354            /**
355             * Holds flag that determines if existing files are passed to the next stage.
356             */
357            private boolean ignoreExisting = false;
358            
359            // Type of file (ascii or binary)
360            private FileType type = FileType.BINARY;
361            
362            // List of criteria that the retrieved file must satisfy.
363            private Set<Criterion> criteria = new HashSet<Criterion>();
364            
365            /** Getter for property path.
366             * @return Value of property path.
367             *
368             */
369            public String getPath() {
370                return this.path;
371            }
372            
373            /** Setter for property path.
374             * @param path New value of property path.
375             *
376             */
377            public void setPath(String path) {
378                this.path = path;
379            }
380            
381            /** Getter for property pattern.
382             * @return Value of property pattern.
383             * @deprecated - not retrievable from criterion
384             */
385            public String getPattern() {
386                return null;
387            }
388            
389            /** Setter for property pattern.
390             * @param pattern New value of property pattern.
391             *
392             */
393            public void setPattern(String pattern) {
394                this.criteria.add(new FileNameMatchCriterion(pattern));
395            }
396            
397            /**
398             * Add a criterion to the set of criteria that must be matched for files
399             * to be downloaded
400             * @param crit {@link Criterion} used to match desired files for download, typically a filename pattern
401             */
402            public void addCriterion(Criterion crit) {
403                this.criteria.add(crit);
404            }
405            
406            /**
407             * Sets the flag determining whether or not the stage will recursively
408             * traverse the directory tree to find files.
409             * @param recursive this value is <CODE>true</CODE> to recursively search the remote directories for matches to
410             * the criterion, <CODE>false</CODE> to turn off recursive searching
411             */
412            public void setRecursive(boolean recursive) {
413                this.recursive = recursive;
414            }
415            
416            /**
417             * Returns whether or not the stage will recursively
418             * traverse the directory tree to find files.
419             * @return the current recursive search setting
420             */
421            public boolean isRecursive() {
422                return this.recursive;
423            }
424            
425            /**
426             * Sets the file type for the transfer. Legal values are "ascii" and "binary".
427             * Binary transfers are the default.
428             * @param fileType the FTP transfer type to use, "<CODE>ascii</CODE>" or "<CODE>binary</CODE>"
429             */
430            public void setFileType(String fileType) {
431                if ("ascii".equalsIgnoreCase(fileType)) {
432                    this.type = FileType.ASCII;
433                } else {
434                    this.type = FileType.BINARY;
435                }
436            }
437            
438            /**
439             * Returns the file type for the transfer.
440             * @return the current FTP transfer type setting
441             */
442            public String getFileType() {
443                return this.type.toString();
444            }
445    
446            /**
447             * Getter for property overwrite. The default value for this flag is 
448             * <CODE>false</CODE>, so existing local files will not be replaced by downloading
449             * remote files. This flag should be set to <CODE>true</CODE> if it is expected
450             * that the remote file is periodically updated and the local file is and out of
451             * date copy from a previous run of this pipeline.
452             * @return Value of property overwrite.
453             */
454            public boolean isOverwrite() {
455                return this.overwrite;
456            }
457    
458            /**
459             * Setter for property overwrite.
460             * @param overwrite New value of property overwrite.
461             */
462            public void setOverwrite(boolean overwrite) {
463                this.overwrite = overwrite;
464            }
465    
466            /**
467             * Getter for property ignoreExisting. The default value for this flag is 
468             * <CODE>false</CODE>, so existing files that aren't downloaded are still passed
469             * on to the next stage.
470             * @return Value of property ignoreExisting.
471             */
472            public boolean isIgnoreExisting() {
473                return this.ignoreExisting;
474            }
475    
476            /**
477             * Setter for property ignoreExisting.
478             * @param ignoreExisting New value of property ignoreExisting.
479             */
480            public void setIgnoreExisting(boolean ignoreExisting) {
481                this.ignoreExisting = ignoreExisting;
482            }
483        }
484        
485        /**
486         * This class is used to specify a criterion that the downloaded file
487         * must satisfy.
488         */
489        public interface Criterion {
490            /**
491             * Interface defining matches for FTP file downloading. Those remote files that
492             * match the criterion will be downloaded.
493             * @param file file to compare criterion to
494             * @return <CODE>true</CODE> if the file meets the Criterion, <CODE>false</CODE> otherwise
495             */
496            public boolean matches(FTPFile file);
497        }
498        
499        /**
500         * Matches file names based upon the Java regex supplied in the constructor.
501         */
502        public static class FileNameMatchCriterion implements Criterion {
503            // precompiled pattern used to match filenames
504            private Pattern pattern;
505            private String _pattern;
506            
507            /**
508             * Construct a new criterion to match on file names.
509             * @param pattern Java regex pattern specifying acceptable file names
510             */
511            public FileNameMatchCriterion(String pattern) {
512                this._pattern = pattern;
513                this.pattern = Pattern.compile(pattern);
514            }
515            
516            /**
517             * Test the given file's name against this criterion.
518             * @param file file to compare to
519             * @return <CODE>true</CODE> if the filename matches the filename pattern of this criterion,
520             * <CODE>false</CODE> otherwise
521             */
522            public boolean matches(FTPFile file) {
523                return pattern.matcher(file.getName()).matches();
524            }
525            
526            /**
527             * Printable version of this Criterion indicating the Java regex used for filename
528             * matching.
529             * @return a string containing the regex used to construct this filename criterion
530             */
531            public String toString() {
532                return "filename matches pattern " + _pattern;
533            }
534        }
535        
536        /**
537         * Matches files by matching their filesystem timestamp to a date range.
538         */
539        public static class FileDateMatchCriterion implements Criterion {
540            private Date startDate;
541            private Date endDate;
542            
543            /**
544             * Construct a new criterion to match file timestamp to a range of dates.
545             * @param startDate starting date (inclusive) of the date range
546             * @param endDate ending date (inclusive) of the date range
547             */
548            public FileDateMatchCriterion(Date startDate, Date endDate) {
549                this.startDate = startDate;
550                this.endDate = endDate;
551            }
552            
553            /**
554             * Test the given file's date against this criterion.
555             * @param file file to compare to
556             * @return <CODE>true</CODE> if the file date falls into the time window of 
557             * [startDate, endDate], <CODE>false</CODE> otherwise
558             */
559            public boolean matches(FTPFile file) {
560                Calendar cal = file.getTimestamp();
561                if ((startDate != null && cal.getTime().before(startDate)) || (endDate != null && cal.getTime().after(endDate))) {
562                    return false;
563                } else {
564                    return true;
565                }
566            }
567            
568            /**
569             * Printable version of this Criterion indicating the inclusive date range used
570             * for file date matching.
571             * @return a string noting the startDate and endDate
572             */
573            public String toString() {
574                return "file date is between " + startDate + " and " + endDate;
575            }
576        }
577    }