001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.commons.pipeline.stage; 019 020 import java.io.IOException; 021 import java.io.InputStream; 022 import java.net.MalformedURLException; 023 import java.net.URL; 024 import java.util.ArrayList; 025 import java.util.List; 026 import java.util.Queue; 027 import org.apache.commons.logging.Log; 028 import org.apache.commons.logging.LogFactory; 029 import org.apache.commons.pipeline.stage.BaseStage; 030 import org.apache.commons.pipeline.StageException; 031 032 /** 033 * Converts a URL into an InputStream. This stage keeps track of all 034 * input streams that are created and closes them at the release step. 035 */ 036 public class URLToInputStreamStage extends BaseStage { 037 private final Log log = LogFactory.getLog(URLToInputStreamStage.class); 038 private List<InputStream> inputStreams = new ArrayList<InputStream>(); 039 040 /** Creates a new instance of URLToInputStreamStage */ 041 public URLToInputStreamStage() { } 042 043 /** 044 * Takes a String or a URL object representing a URL and exqueues the input 045 * stream returned by opening that URL. 046 * 047 * @param obj A String or URL object 048 */ 049 public void process(Object obj) throws org.apache.commons.pipeline.StageException { 050 URL url = null; 051 if (obj instanceof URL){ 052 url = (URL) obj; 053 } else if (obj instanceof String) { 054 String urlString = (String) obj; 055 try { 056 url = new URL(urlString); 057 } catch (MalformedURLException e){ 058 throw new StageException(this, "Error converting url String:" + urlString, e); 059 } 060 } 061 062 try { 063 InputStream inputStream = url.openStream(); 064 this.inputStreams.add(inputStream); 065 log.info("enqueing input stream"); 066 this.emit(inputStream); 067 } catch (IOException e){ 068 throw new StageException(this, "Error with stream from url:" + url, e); 069 } 070 } 071 072 /** 073 * Ensure that all opened input streams are closed. 074 */ 075 public void release() { 076 log.info("running post process number of streams:" + inputStreams.size()); 077 while(inputStreams.size() > 0){ 078 InputStream is = (InputStream) inputStreams.remove(0); 079 try { 080 is.close(); 081 log.info("closed stream"); 082 } catch (IOException e){ 083 log.warn("Error closing stream",e); 084 } 085 } 086 } 087 }