001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.commons.pipeline.stage;
019
020 import java.io.IOException;
021 import java.io.InputStream;
022 import java.net.MalformedURLException;
023 import java.net.URL;
024 import java.util.ArrayList;
025 import java.util.List;
026 import java.util.Queue;
027 import org.apache.commons.logging.Log;
028 import org.apache.commons.logging.LogFactory;
029 import org.apache.commons.pipeline.stage.BaseStage;
030 import org.apache.commons.pipeline.StageException;
031
032 /**
033 * Converts a URL into an InputStream. This stage keeps track of all
034 * input streams that are created and closes them at the release step.
035 */
036 public class URLToInputStreamStage extends BaseStage {
037 private final Log log = LogFactory.getLog(URLToInputStreamStage.class);
038 private List<InputStream> inputStreams = new ArrayList<InputStream>();
039
040 /** Creates a new instance of URLToInputStreamStage */
041 public URLToInputStreamStage() { }
042
043 /**
044 * Takes a String or a URL object representing a URL and exqueues the input
045 * stream returned by opening that URL.
046 *
047 * @param obj A String or URL object
048 */
049 public void process(Object obj) throws org.apache.commons.pipeline.StageException {
050 URL url = null;
051 if (obj instanceof URL){
052 url = (URL) obj;
053 } else if (obj instanceof String) {
054 String urlString = (String) obj;
055 try {
056 url = new URL(urlString);
057 } catch (MalformedURLException e){
058 throw new StageException(this, "Error converting url String:" + urlString, e);
059 }
060 }
061
062 try {
063 InputStream inputStream = url.openStream();
064 this.inputStreams.add(inputStream);
065 log.info("enqueing input stream");
066 this.emit(inputStream);
067 } catch (IOException e){
068 throw new StageException(this, "Error with stream from url:" + url, e);
069 }
070 }
071
072 /**
073 * Ensure that all opened input streams are closed.
074 */
075 public void release() {
076 log.info("running post process number of streams:" + inputStreams.size());
077 while(inputStreams.size() > 0){
078 InputStream is = (InputStream) inputStreams.remove(0);
079 try {
080 is.close();
081 log.info("closed stream");
082 } catch (IOException e){
083 log.warn("Error closing stream",e);
084 }
085 }
086 }
087 }