1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.commons.pipeline.stage;
19
20 import java.io.File;
21 import java.io.FileOutputStream;
22 import java.io.IOException;
23 import java.io.OutputStream;
24 import java.util.Calendar;
25 import java.util.Date;
26 import java.util.HashSet;
27 import java.util.regex.Pattern;
28 import java.util.Set;
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.commons.net.ftp.FTPClient;
32 import org.apache.commons.net.ftp.FTPFile;
33 import org.apache.commons.net.ftp.FTPReply;
34 import org.apache.commons.pipeline.StageException;
35
36
37
38
39
40
41 public class FtpFileDownloadStage extends BaseStage {
42 private final Log log = LogFactory.getLog(FtpFileDownloadStage.class);
43
44 private String workDir = "/tmp";
45 private File fworkDir;
46 private FTPClient client = new FTPClient();
47
48
49 private String host;
50
51
52 private String user;
53
54
55 private String password;
56
57
58 private int port;
59
60
61
62
63 public FtpFileDownloadStage() {
64 }
65
66
67
68
69
70 public FtpFileDownloadStage(String workDir) {
71 this.workDir = workDir;
72 }
73
74
75
76
77
78
79
80 public void preprocess() throws StageException {
81 super.preprocess();
82 if (fworkDir == null) fworkDir = new File(workDir);
83 if (!this.fworkDir.exists()) fworkDir.mkdirs();
84
85 try {
86
87 client.connect(host, port);
88 log.debug(client.getReplyString());
89 if(!FTPReply.isPositiveCompletion(client.getReplyCode())) {
90 throw new IOException("FTP server at host " + host + " refused connection.");
91 }
92
93 client.login(user, password);
94 log.debug(client.getReplyString());
95 if(!FTPReply.isPositiveCompletion(client.getReplyCode())) {
96 throw new StageException(this, "FTP login failed for user " + user + ": " + client.getReplyString());
97 }
98 } catch (IOException e) {
99 throw new StageException(this, e);
100 }
101 }
102
103
104
105
106
107
108
109
110
111 public void process(Object obj) throws StageException {
112 if (!this.fworkDir.exists()) throw new StageException(this, "The work directory for file download " + workDir.toString() + " does not exist.");
113
114 FileSpec spec = (FileSpec) obj;
115
116 try {
117 client.setFileType(spec.type.intValue());
118 client.changeWorkingDirectory(spec.path);
119 if(!FTPReply.isPositiveCompletion(client.getReplyCode())) {
120 throw new IOException("FTP client could not change to remote directory " + spec.path + ": " + client.getReplyString());
121 }
122
123 log.debug("FTP connection successfully established to " + host + ":" + spec.path);
124
125
126 client.enterLocalPassiveMode();
127 searchCurrentDirectory("", spec);
128 } catch (IOException e) {
129 throw new StageException(this, e);
130 }
131 }
132
133
134
135
136
137
138
139 private void searchCurrentDirectory(String path, FileSpec spec) throws IOException {
140 FTPFile[] files = client.listFiles();
141 if(!FTPReply.isPositiveCompletion(client.getReplyCode())) {
142 throw new IOException("FTP client could not obtain file list : " + client.getReplyString());
143 }
144
145 search: for (FTPFile file : files) {
146 String localPath = path + File.separatorChar + file.getName();
147
148 if (file.isDirectory() && spec.recursive) {
149 log.debug("Recursing into directory " + file.getName());
150 client.changeWorkingDirectory(file.getName());
151 searchCurrentDirectory(localPath, spec);
152 client.changeToParentDirectory();
153 } else {
154 log.debug("Examining file " + localPath);
155 for (Criterion crit : spec.criteria) {
156 if (!crit.matches(file)) {
157 log.info("File " + localPath + " failed criterion check " + crit);
158 continue search;
159 }
160 }
161
162 boolean getFile = true;
163 File localFile = new File(workDir + File.separatorChar + localPath);
164 if (localFile.exists()) {
165 if (spec.overwrite) {
166 log.info("Replacing existing local file " + localFile.getPath());
167 getFile = true;
168 } else {
169 if (spec.ignoreExisting) {
170 log.info("Ignoring existing local file " + localFile.getPath());
171 continue search;
172 } else {
173 log.info("Using existing local file " + localFile.getPath());
174 getFile = false;
175 }
176 }
177 } else {
178 getFile = true;
179 }
180
181 if (getFile) {
182 if (! localFile.getParentFile().exists()) localFile.getParentFile().mkdir();
183
184 OutputStream out = new FileOutputStream(localFile);
185 try {
186 client.retrieveFile(file.getName(), out);
187 } finally {
188 out.flush();
189 out.close();
190 }
191 }
192
193 this.emit(localFile);
194 }
195 }
196 }
197
198
199
200
201 public void release() {
202 try {
203 client.disconnect();
204 } catch (IOException e) {
205 log.error(e.getMessage(), e);
206 }
207 }
208
209
210
211
212
213
214 public void setWorkDir(String workDir) {
215 this.workDir = workDir;
216 }
217
218
219
220
221
222 public String getWorkDir() {
223 return this.workDir;
224 }
225
226
227
228
229
230 public String getHost() {
231 return this.host;
232 }
233
234
235
236
237
238 public void setHost(String host) {
239 this.host = host;
240 }
241
242
243
244
245
246 public String getUser() {
247 return this.user;
248 }
249
250
251
252
253
254 public void setUser(String user) {
255 this.user = user;
256 }
257
258
259
260
261
262 public void setPassword(String password) {
263 this.password = password;
264 }
265
266
267
268
269
270 public int getPort() {
271 return this.port;
272 }
273
274
275
276
277
278 public void setPort(int port) {
279 this.port = port;
280 }
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316 public static class FileSpec {
317
318
319
320 public enum FileType {
321
322
323
324 ASCII(FTPClient.ASCII_FILE_TYPE),
325
326
327
328 BINARY(FTPClient.BINARY_FILE_TYPE);
329
330 private int type;
331
332 private FileType(int type) {
333 this.type = type;
334 }
335
336
337
338
339
340 public int intValue() {
341 return this.type;
342 }
343 }
344
345
346 private String path = "/";
347
348
349 private boolean recursive;
350
351
352 private boolean overwrite = false;
353
354
355
356
357 private boolean ignoreExisting = false;
358
359
360 private FileType type = FileType.BINARY;
361
362
363 private Set<Criterion> criteria = new HashSet<Criterion>();
364
365
366
367
368
369 public String getPath() {
370 return this.path;
371 }
372
373
374
375
376
377 public void setPath(String path) {
378 this.path = path;
379 }
380
381
382
383
384
385 public String getPattern() {
386 return null;
387 }
388
389
390
391
392
393 public void setPattern(String pattern) {
394 this.criteria.add(new FileNameMatchCriterion(pattern));
395 }
396
397
398
399
400
401
402 public void addCriterion(Criterion crit) {
403 this.criteria.add(crit);
404 }
405
406
407
408
409
410
411
412 public void setRecursive(boolean recursive) {
413 this.recursive = recursive;
414 }
415
416
417
418
419
420
421 public boolean isRecursive() {
422 return this.recursive;
423 }
424
425
426
427
428
429
430 public void setFileType(String fileType) {
431 if ("ascii".equalsIgnoreCase(fileType)) {
432 this.type = FileType.ASCII;
433 } else {
434 this.type = FileType.BINARY;
435 }
436 }
437
438
439
440
441
442 public String getFileType() {
443 return this.type.toString();
444 }
445
446
447
448
449
450
451
452
453
454 public boolean isOverwrite() {
455 return this.overwrite;
456 }
457
458
459
460
461
462 public void setOverwrite(boolean overwrite) {
463 this.overwrite = overwrite;
464 }
465
466
467
468
469
470
471
472 public boolean isIgnoreExisting() {
473 return this.ignoreExisting;
474 }
475
476
477
478
479
480 public void setIgnoreExisting(boolean ignoreExisting) {
481 this.ignoreExisting = ignoreExisting;
482 }
483 }
484
485
486
487
488
489 public interface Criterion {
490
491
492
493
494
495
496 public boolean matches(FTPFile file);
497 }
498
499
500
501
502 public static class FileNameMatchCriterion implements Criterion {
503
504 private Pattern pattern;
505 private String _pattern;
506
507
508
509
510
511 public FileNameMatchCriterion(String pattern) {
512 this._pattern = pattern;
513 this.pattern = Pattern.compile(pattern);
514 }
515
516
517
518
519
520
521
522 public boolean matches(FTPFile file) {
523 return pattern.matcher(file.getName()).matches();
524 }
525
526
527
528
529
530
531 public String toString() {
532 return "filename matches pattern " + _pattern;
533 }
534 }
535
536
537
538
539 public static class FileDateMatchCriterion implements Criterion {
540 private Date startDate;
541 private Date endDate;
542
543
544
545
546
547
548 public FileDateMatchCriterion(Date startDate, Date endDate) {
549 this.startDate = startDate;
550 this.endDate = endDate;
551 }
552
553
554
555
556
557
558
559 public boolean matches(FTPFile file) {
560 Calendar cal = file.getTimestamp();
561 if ((startDate != null && cal.getTime().before(startDate)) || (endDate != null && cal.getTime().after(endDate))) {
562 return false;
563 } else {
564 return true;
565 }
566 }
567
568
569
570
571
572
573 public String toString() {
574 return "file date is between " + startDate + " and " + endDate;
575 }
576 }
577 }