1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.commons.pipeline.stage;
19
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.net.MalformedURLException;
23 import java.net.URL;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.Queue;
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.commons.pipeline.stage.BaseStage;
30 import org.apache.commons.pipeline.StageException;
31
32 /**
33 * Converts a URL into an InputStream. This stage keeps track of all
34 * input streams that are created and closes them at the release step.
35 */
36 public class URLToInputStreamStage extends BaseStage {
37 private final Log log = LogFactory.getLog(URLToInputStreamStage.class);
38 private List<InputStream> inputStreams = new ArrayList<InputStream>();
39
40 /** Creates a new instance of URLToInputStreamStage */
41 public URLToInputStreamStage() { }
42
43 /**
44 * Takes a String or a URL object representing a URL and exqueues the input
45 * stream returned by opening that URL.
46 *
47 * @param obj A String or URL object
48 */
49 public void process(Object obj) throws org.apache.commons.pipeline.StageException {
50 URL url = null;
51 if (obj instanceof URL){
52 url = (URL) obj;
53 } else if (obj instanceof String) {
54 String urlString = (String) obj;
55 try {
56 url = new URL(urlString);
57 } catch (MalformedURLException e){
58 throw new StageException(this, "Error converting url String:" + urlString, e);
59 }
60 }
61
62 try {
63 InputStream inputStream = url.openStream();
64 this.inputStreams.add(inputStream);
65 log.info("enqueing input stream");
66 this.emit(inputStream);
67 } catch (IOException e){
68 throw new StageException(this, "Error with stream from url:" + url, e);
69 }
70 }
71
72 /**
73 * Ensure that all opened input streams are closed.
74 */
75 public void release() {
76 log.info("running post process number of streams:" + inputStreams.size());
77 while(inputStreams.size() > 0){
78 InputStream is = (InputStream) inputStreams.remove(0);
79 try {
80 is.close();
81 log.info("closed stream");
82 } catch (IOException e){
83 log.warn("Error closing stream",e);
84 }
85 }
86 }
87 }