001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.commons.pipeline.driver; 019 020 021 import org.apache.commons.pipeline.Stage; 022 import org.apache.commons.pipeline.StageContext; 023 import org.apache.commons.pipeline.StageDriver; 024 import org.apache.commons.pipeline.StageDriverFactory; 025 import org.apache.commons.pipeline.util.BlockingQueueFactory; 026 027 /** 028 * This factory is used to create {@link ThreadPoolStageDriver} instances configured 029 * to run specific stages. 030 */ 031 public class ThreadPoolStageDriverFactory implements StageDriverFactory { 032 033 private int numThreads = 1; 034 035 /** Creates a new instance of ThreadPoolStageDriverFactory */ 036 public ThreadPoolStageDriverFactory() { 037 } 038 039 /** 040 * Creates the new {@link ThreadPoolStageDriver} based upon the configuration 041 * of this factory instance 042 * @param stage The stage to be run by the newly created driver 043 * @param context The context in which the stage will be run 044 * @return the newly created driver 045 */ 046 public StageDriver createStageDriver(Stage stage, StageContext context) { 047 try { 048 return new ThreadPoolStageDriver(stage, context, queueFactory.createQueue(), timeout, faultTolerance, numThreads); 049 } catch (Exception e) { 050 throw new IllegalStateException("Instantiation of driver failed due to illegal factory state.", e); 051 } 052 } 053 054 /** 055 * Holds value of property queueFactory. 056 */ 057 private BlockingQueueFactory<?> queueFactory = new BlockingQueueFactory.LinkedBlockingQueueFactory(); 058 059 /** 060 * Getter for property queueFactory. 061 * @return Value of property queueFactory. 062 */ 063 public BlockingQueueFactory<?> getQueueFactory() { 064 return this.queueFactory; 065 } 066 067 /** 068 * Setter for property queueFactory. 069 * @param queueFactory New value of property queueFactory. 070 */ 071 public void setQueueFactory(BlockingQueueFactory<?> queueFactory) { 072 this.queueFactory = queueFactory; 073 } 074 075 /** 076 * Holds value of property timeout. 077 */ 078 private long timeout = 500; 079 080 /** 081 * Timeout for wait to ensure deadlock cannot occur on thread termination. 082 * Default is 500 083 * @return Value of property timeout. 084 */ 085 public long getTimeout() { 086 return this.timeout; 087 } 088 089 /** 090 * Setter for property timeout. 091 * @param timeout New value of property timeout. 092 */ 093 public void setTimeout(long timeout) { 094 this.timeout = timeout; 095 } 096 097 /** 098 * Holds value of property faultTolerance. 099 */ 100 private FaultTolerance faultTolerance = FaultTolerance.NONE; 101 102 /** 103 * Getter for property faultTolerance. See {@link FaultTolerance} for valid values 104 * and enumation meanings. 105 * @return Value of property faultTolerance. 106 */ 107 public FaultTolerance getFaultTolerance() { 108 return this.faultTolerance; 109 } 110 111 /** 112 * Setter for property faultTolerance. 113 * 114 * @param faultTolerance New value of property faultTolerance. 115 */ 116 public void setFaultTolerance(FaultTolerance faultTolerance) { 117 this.faultTolerance = faultTolerance; 118 } 119 120 /** 121 * Convenience setter for property faultTolerance for use by Digester. 122 * 123 * @param level New value of property level ("ALL","CHECKED", or "NONE"). 124 */ 125 public void setFaultToleranceLevel(String level) { 126 this.faultTolerance = FaultTolerance.valueOf(level); 127 } 128 129 /** 130 * Returns the number of threads that will be allocated to the thread 131 * pool of a driver created by this factory. 132 */ 133 public int getNumThreads() { 134 return numThreads; 135 } 136 137 /** 138 * Sets the number of threads that will be allocated to the thread 139 * pool of a driver created by this factory. 140 */ 141 public void setNumThreads(int numThreads) { 142 this.numThreads = numThreads; 143 } 144 145 }