001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.commons.pipeline.stage; 019 020 import java.io.File; 021 import java.io.FileOutputStream; 022 import java.io.IOException; 023 import java.io.OutputStream; 024 import java.util.Calendar; 025 import java.util.Date; 026 import java.util.HashSet; 027 import java.util.regex.Pattern; 028 import java.util.Set; 029 import org.apache.commons.logging.Log; 030 import org.apache.commons.logging.LogFactory; 031 import org.apache.commons.net.ftp.FTPClient; 032 import org.apache.commons.net.ftp.FTPFile; 033 import org.apache.commons.net.ftp.FTPReply; 034 import org.apache.commons.pipeline.StageException; 035 036 /** 037 * <p>This {@link org.apache.commons.pipeline.Pipeline$Stage Stage} provides the 038 * functionality needed to retrieve data from an FTP URL. Multipart responses 039 * are not yet supported.</p> 040 */ 041 public class FtpFileDownloadStage extends BaseStage { 042 private final Log log = LogFactory.getLog(FtpFileDownloadStage.class); 043 044 private String workDir = "/tmp"; 045 private File fworkDir; 046 private FTPClient client = new FTPClient(); 047 048 /** Holds value of property host. */ 049 private String host; 050 051 /** Holds value of property user. */ 052 private String user; 053 054 /** Holds value of property password. */ 055 private String password; 056 057 /** Holds value of property port. */ 058 private int port; 059 060 /** 061 * Default constructor - creates work directory in /tmp 062 */ 063 public FtpFileDownloadStage() { 064 } 065 066 /** 067 * Constructor specifying work directory. 068 * @param workDir local directory in which to store downloaded files 069 */ 070 public FtpFileDownloadStage(String workDir) { 071 this.workDir = workDir; 072 } 073 074 /** 075 * Creates the download directory {@link #setWorkDir(String) workDir} uf it does 076 * not exist and makes a connection to the remote FTP server. 077 * @throws org.apache.commons.pipeline.StageException if a connection to the remote FTP server cannot be established, or the login to 078 * the remote system fails 079 */ 080 public void preprocess() throws StageException { 081 super.preprocess(); 082 if (fworkDir == null) fworkDir = new File(workDir); 083 if (!this.fworkDir.exists()) fworkDir.mkdirs(); 084 085 try { 086 //connect to the ftp site 087 client.connect(host, port); 088 log.debug(client.getReplyString()); 089 if(!FTPReply.isPositiveCompletion(client.getReplyCode())) { 090 throw new IOException("FTP server at host " + host + " refused connection."); 091 } 092 093 client.login(user, password); 094 log.debug(client.getReplyString()); 095 if(!FTPReply.isPositiveCompletion(client.getReplyCode())) { 096 throw new StageException(this, "FTP login failed for user " + user + ": " + client.getReplyString()); 097 } 098 } catch (IOException e) { 099 throw new StageException(this, e); 100 } 101 } 102 103 /** 104 * Retrieves files that match the specified FileSpec from the FTP server 105 * and stores them in the work directory. 106 * @param obj incoming {@link FileSpec} that indicates the file to download along with some flags to 107 * control the download behavior 108 * @throws org.apache.commons.pipeline.StageException if there are errors navigating the remote directory structure or file download 109 * fails 110 */ 111 public void process(Object obj) throws StageException { 112 if (!this.fworkDir.exists()) throw new StageException(this, "The work directory for file download " + workDir.toString() + " does not exist."); 113 114 FileSpec spec = (FileSpec) obj; 115 116 try { 117 client.setFileType(spec.type.intValue()); 118 client.changeWorkingDirectory(spec.path); 119 if(!FTPReply.isPositiveCompletion(client.getReplyCode())) { 120 throw new IOException("FTP client could not change to remote directory " + spec.path + ": " + client.getReplyString()); 121 } 122 123 log.debug("FTP connection successfully established to " + host + ":" + spec.path); 124 125 //get the list of files 126 client.enterLocalPassiveMode(); 127 searchCurrentDirectory("", spec); 128 } catch (IOException e) { 129 throw new StageException(this, e); 130 } 131 } 132 133 134 /** 135 * Search the current working directory of the FTP client, saving files 136 * to the path specified by workDir + the path to the file on the FTP server. 137 * This method will optionally recursively search directories on the remote server. 138 */ 139 private void searchCurrentDirectory(String path, FileSpec spec) throws IOException { 140 FTPFile[] files = client.listFiles(); 141 if(!FTPReply.isPositiveCompletion(client.getReplyCode())) { 142 throw new IOException("FTP client could not obtain file list : " + client.getReplyString()); 143 } 144 145 search: for (FTPFile file : files) { 146 String localPath = path + File.separatorChar + file.getName(); 147 148 if (file.isDirectory() && spec.recursive) { 149 log.debug("Recursing into directory " + file.getName()); 150 client.changeWorkingDirectory(file.getName()); 151 searchCurrentDirectory(localPath, spec); 152 client.changeToParentDirectory(); 153 } else { 154 log.debug("Examining file " + localPath); 155 for (Criterion crit : spec.criteria) { 156 if (!crit.matches(file)) { 157 log.info("File " + localPath + " failed criterion check " + crit); 158 continue search; 159 } 160 } 161 162 boolean getFile = true; 163 File localFile = new File(workDir + File.separatorChar + localPath); 164 if (localFile.exists()) { 165 if (spec.overwrite) { 166 log.info("Replacing existing local file " + localFile.getPath()); 167 getFile = true; 168 } else { 169 if (spec.ignoreExisting) { 170 log.info("Ignoring existing local file " + localFile.getPath()); 171 continue search; 172 } else { 173 log.info("Using existing local file " + localFile.getPath()); 174 getFile = false; 175 } 176 } 177 } else { 178 getFile = true; 179 } 180 181 if (getFile) { 182 if (! localFile.getParentFile().exists()) localFile.getParentFile().mkdir(); 183 184 OutputStream out = new FileOutputStream(localFile); 185 try { 186 client.retrieveFile(file.getName(), out); 187 } finally { 188 out.flush(); 189 out.close(); 190 } 191 } 192 193 this.emit(localFile); 194 } 195 } 196 } 197 198 /** 199 * Disconnects from FTP server. Errors are logged. 200 */ 201 public void release() { 202 try { 203 client.disconnect(); //close ftp connection 204 } catch (IOException e) { 205 log.error(e.getMessage(), e); 206 } 207 } 208 209 /** 210 * Sets the working directory for the file download. If the directory does 211 * not already exist, it will be created during the preprocess() step. 212 * @param workDir local directory to receive file downloads 213 */ 214 public void setWorkDir(String workDir) { 215 this.workDir = workDir; 216 } 217 218 /** 219 * Returns the name of the file download directory. 220 * @return the string containing the local working directory 221 */ 222 public String getWorkDir() { 223 return this.workDir; 224 } 225 226 /** Getter for property host. 227 * @return Value of property host. 228 * 229 */ 230 public String getHost() { 231 return this.host; 232 } 233 234 /** Setter for property host. 235 * @param host New value of property host. 236 * 237 */ 238 public void setHost(String host) { 239 this.host = host; 240 } 241 242 /** Getter for property user. 243 * @return Value of property user. 244 * 245 */ 246 public String getUser() { 247 return this.user; 248 } 249 250 /** Setter for property user. 251 * @param user New value of property user. 252 * 253 */ 254 public void setUser(String user) { 255 this.user = user; 256 } 257 258 /** Setter for property password. 259 * @param password New value of property password. 260 * 261 */ 262 public void setPassword(String password) { 263 this.password = password; 264 } 265 266 /** 267 * Getter for property port. 268 * @return Value of property port. 269 */ 270 public int getPort() { 271 return this.port; 272 } 273 274 /** 275 * Setter for property port. 276 * @param port New value of property port. 277 */ 278 public void setPort(int port) { 279 this.port = port; 280 } 281 282 /** 283 * This class is used to specify a path and pattern of file for the FtpFileDownload 284 * to retrieve. There are some parameters that can be configured in the filespec 285 * that will control download behavior for <CODE>recursive</CODE> searching, the 286 * <CODE>overwrite</CODE> of locally existing files, and to 287 * <CODE>ignoreExisting</CODE> files. 288 * <p> 289 * If a file already exists in the local directory, it is only replaced if 290 * <CODE>overwrite</CODE> is set to <CODE>true</CODE>. If it is replaced, then the 291 * filename is passed on to the next stage. Existing files are passed on to the 292 * stage unless <CODE>ignoreExisting</CODE> is <CODE>true</CODE>. Note that the 293 * <CODE>ignoreExisting</CODE> flag is only used if <CODE>overwrite</CODE> is 294 * <CODE>false</CODE> (it's assumed that if a file will be downloaded, then it 295 * shouldn't be ignored). 296 * <p> 297 * Pseudocode to summarize the interaction of <CODE>overwrite</CODE> and 298 * <CODE>ignoreExisting</CODE>: <PRE> 299 * if (file exists) { 300 * if (overwrite) { 301 * download file over existing local copy 302 * and pass it on to the next stage 303 * } else { 304 * if (ignoreExisting) { 305 * skip this file 306 * } else { 307 * pass existing file on to the next stage 308 * } 309 * } 310 * } else { 311 * download new file 312 * and pass it on to the next stage 313 * } 314 * </PRE> 315 */ 316 public static class FileSpec { 317 /** 318 * Enumeration of legal FTP file tranfer types 319 */ 320 public enum FileType { 321 /** 322 * ASCII text transfer mode, with end of line conversion. 323 */ 324 ASCII(FTPClient.ASCII_FILE_TYPE), 325 /** 326 * Binary transfer mode, no changes made to data stream. 327 */ 328 BINARY(FTPClient.BINARY_FILE_TYPE); 329 330 private int type; 331 332 private FileType(int type) { 333 this.type = type; 334 } 335 336 /** 337 * Get the integer value of the FTP transfer mode enumeration. 338 * @return the integer equivalent to the FTP transfer mode setting 339 */ 340 public int intValue() { 341 return this.type; 342 } 343 } 344 345 /** Holds value of property path. */ 346 private String path = "/"; 347 348 /** Holds flag that determines whether or not to perform recursive search of the specified path */ 349 private boolean recursive; 350 351 // Holds flag that determines whether or not to overwrite local files 352 private boolean overwrite = false; 353 354 /** 355 * Holds flag that determines if existing files are passed to the next stage. 356 */ 357 private boolean ignoreExisting = false; 358 359 // Type of file (ascii or binary) 360 private FileType type = FileType.BINARY; 361 362 // List of criteria that the retrieved file must satisfy. 363 private Set<Criterion> criteria = new HashSet<Criterion>(); 364 365 /** Getter for property path. 366 * @return Value of property path. 367 * 368 */ 369 public String getPath() { 370 return this.path; 371 } 372 373 /** Setter for property path. 374 * @param path New value of property path. 375 * 376 */ 377 public void setPath(String path) { 378 this.path = path; 379 } 380 381 /** Getter for property pattern. 382 * @return Value of property pattern. 383 * @deprecated - not retrievable from criterion 384 */ 385 public String getPattern() { 386 return null; 387 } 388 389 /** Setter for property pattern. 390 * @param pattern New value of property pattern. 391 * 392 */ 393 public void setPattern(String pattern) { 394 this.criteria.add(new FileNameMatchCriterion(pattern)); 395 } 396 397 /** 398 * Add a criterion to the set of criteria that must be matched for files 399 * to be downloaded 400 * @param crit {@link Criterion} used to match desired files for download, typically a filename pattern 401 */ 402 public void addCriterion(Criterion crit) { 403 this.criteria.add(crit); 404 } 405 406 /** 407 * Sets the flag determining whether or not the stage will recursively 408 * traverse the directory tree to find files. 409 * @param recursive this value is <CODE>true</CODE> to recursively search the remote directories for matches to 410 * the criterion, <CODE>false</CODE> to turn off recursive searching 411 */ 412 public void setRecursive(boolean recursive) { 413 this.recursive = recursive; 414 } 415 416 /** 417 * Returns whether or not the stage will recursively 418 * traverse the directory tree to find files. 419 * @return the current recursive search setting 420 */ 421 public boolean isRecursive() { 422 return this.recursive; 423 } 424 425 /** 426 * Sets the file type for the transfer. Legal values are "ascii" and "binary". 427 * Binary transfers are the default. 428 * @param fileType the FTP transfer type to use, "<CODE>ascii</CODE>" or "<CODE>binary</CODE>" 429 */ 430 public void setFileType(String fileType) { 431 if ("ascii".equalsIgnoreCase(fileType)) { 432 this.type = FileType.ASCII; 433 } else { 434 this.type = FileType.BINARY; 435 } 436 } 437 438 /** 439 * Returns the file type for the transfer. 440 * @return the current FTP transfer type setting 441 */ 442 public String getFileType() { 443 return this.type.toString(); 444 } 445 446 /** 447 * Getter for property overwrite. The default value for this flag is 448 * <CODE>false</CODE>, so existing local files will not be replaced by downloading 449 * remote files. This flag should be set to <CODE>true</CODE> if it is expected 450 * that the remote file is periodically updated and the local file is and out of 451 * date copy from a previous run of this pipeline. 452 * @return Value of property overwrite. 453 */ 454 public boolean isOverwrite() { 455 return this.overwrite; 456 } 457 458 /** 459 * Setter for property overwrite. 460 * @param overwrite New value of property overwrite. 461 */ 462 public void setOverwrite(boolean overwrite) { 463 this.overwrite = overwrite; 464 } 465 466 /** 467 * Getter for property ignoreExisting. The default value for this flag is 468 * <CODE>false</CODE>, so existing files that aren't downloaded are still passed 469 * on to the next stage. 470 * @return Value of property ignoreExisting. 471 */ 472 public boolean isIgnoreExisting() { 473 return this.ignoreExisting; 474 } 475 476 /** 477 * Setter for property ignoreExisting. 478 * @param ignoreExisting New value of property ignoreExisting. 479 */ 480 public void setIgnoreExisting(boolean ignoreExisting) { 481 this.ignoreExisting = ignoreExisting; 482 } 483 } 484 485 /** 486 * This class is used to specify a criterion that the downloaded file 487 * must satisfy. 488 */ 489 public interface Criterion { 490 /** 491 * Interface defining matches for FTP file downloading. Those remote files that 492 * match the criterion will be downloaded. 493 * @param file file to compare criterion to 494 * @return <CODE>true</CODE> if the file meets the Criterion, <CODE>false</CODE> otherwise 495 */ 496 public boolean matches(FTPFile file); 497 } 498 499 /** 500 * Matches file names based upon the Java regex supplied in the constructor. 501 */ 502 public static class FileNameMatchCriterion implements Criterion { 503 // precompiled pattern used to match filenames 504 private Pattern pattern; 505 private String _pattern; 506 507 /** 508 * Construct a new criterion to match on file names. 509 * @param pattern Java regex pattern specifying acceptable file names 510 */ 511 public FileNameMatchCriterion(String pattern) { 512 this._pattern = pattern; 513 this.pattern = Pattern.compile(pattern); 514 } 515 516 /** 517 * Test the given file's name against this criterion. 518 * @param file file to compare to 519 * @return <CODE>true</CODE> if the filename matches the filename pattern of this criterion, 520 * <CODE>false</CODE> otherwise 521 */ 522 public boolean matches(FTPFile file) { 523 return pattern.matcher(file.getName()).matches(); 524 } 525 526 /** 527 * Printable version of this Criterion indicating the Java regex used for filename 528 * matching. 529 * @return a string containing the regex used to construct this filename criterion 530 */ 531 public String toString() { 532 return "filename matches pattern " + _pattern; 533 } 534 } 535 536 /** 537 * Matches files by matching their filesystem timestamp to a date range. 538 */ 539 public static class FileDateMatchCriterion implements Criterion { 540 private Date startDate; 541 private Date endDate; 542 543 /** 544 * Construct a new criterion to match file timestamp to a range of dates. 545 * @param startDate starting date (inclusive) of the date range 546 * @param endDate ending date (inclusive) of the date range 547 */ 548 public FileDateMatchCriterion(Date startDate, Date endDate) { 549 this.startDate = startDate; 550 this.endDate = endDate; 551 } 552 553 /** 554 * Test the given file's date against this criterion. 555 * @param file file to compare to 556 * @return <CODE>true</CODE> if the file date falls into the time window of 557 * [startDate, endDate], <CODE>false</CODE> otherwise 558 */ 559 public boolean matches(FTPFile file) { 560 Calendar cal = file.getTimestamp(); 561 if ((startDate != null && cal.getTime().before(startDate)) || (endDate != null && cal.getTime().after(endDate))) { 562 return false; 563 } else { 564 return true; 565 } 566 } 567 568 /** 569 * Printable version of this Criterion indicating the inclusive date range used 570 * for file date matching. 571 * @return a string noting the startDate and endDate 572 */ 573 public String toString() { 574 return "file date is between " + startDate + " and " + endDate; 575 } 576 } 577 }