001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.commons.pipeline.stage;
019    
020    import java.io.IOException;
021    import java.io.InputStream;
022    import java.net.MalformedURLException;
023    import java.net.URL;
024    import java.util.ArrayList;
025    import java.util.List;
026    import java.util.Queue;
027    import org.apache.commons.logging.Log;
028    import org.apache.commons.logging.LogFactory;
029    import org.apache.commons.pipeline.stage.BaseStage;
030    import org.apache.commons.pipeline.StageException;
031    
032    /**
033     * Converts a URL into an InputStream.  This stage keeps track of all
034     * input streams that are created and closes them at the release step.
035     */
036    public class URLToInputStreamStage extends BaseStage {
037        private final Log log = LogFactory.getLog(URLToInputStreamStage.class);
038        private List<InputStream> inputStreams = new ArrayList<InputStream>();
039        
040        /** Creates a new instance of URLToInputStreamStage */
041        public URLToInputStreamStage() {    }
042        
043        /** 
044         * Takes a String or a URL object representing a URL and exqueues the input 
045         * stream returned by opening that URL.
046         *
047         * @param obj A String or URL object
048         */    
049        public void process(Object obj) throws org.apache.commons.pipeline.StageException {        
050            URL url = null;
051            if (obj instanceof URL){
052                url = (URL) obj;
053            } else if (obj instanceof String) {
054                String urlString = (String) obj;
055                try {
056                    url = new URL(urlString);
057                } catch (MalformedURLException e){
058                    throw new StageException(this, "Error converting url String:" + urlString, e);
059                }
060            }
061            
062            try {
063                InputStream inputStream = url.openStream();
064                this.inputStreams.add(inputStream);
065                log.info("enqueing input stream");
066                this.emit(inputStream);
067            } catch (IOException e){
068                throw new StageException(this, "Error with stream from url:" + url, e);
069            }
070        }
071        
072        /**
073         * Ensure that all opened input streams are closed.
074         */
075        public void release() {
076            log.info("running post process number of streams:" + inputStreams.size());
077            while(inputStreams.size() > 0){
078                InputStream is = (InputStream) inputStreams.remove(0);
079                try {
080                    is.close();
081                    log.info("closed stream");
082                } catch (IOException e){
083                    log.warn("Error closing stream",e);
084                }
085            }
086        }
087    }