001package org.apache.commons.jcs.engine; 002 003import java.util.ArrayList; 004import java.util.concurrent.BlockingQueue; 005import java.util.concurrent.ThreadPoolExecutor; 006 007/* 008 * Licensed to the Apache Software Foundation (ASF) under one 009 * or more contributor license agreements. See the NOTICE file 010 * distributed with this work for additional information 011 * regarding copyright ownership. The ASF licenses this file 012 * to you under the Apache License, Version 2.0 (the 013 * "License"); you may not use this file except in compliance 014 * with the License. You may obtain a copy of the License at 015 * 016 * http://www.apache.org/licenses/LICENSE-2.0 017 * 018 * Unless required by applicable law or agreed to in writing, 019 * software distributed under the License is distributed on an 020 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 021 * KIND, either express or implied. See the License for the 022 * specific language governing permissions and limitations 023 * under the License. 024 */ 025 026import org.apache.commons.jcs.engine.behavior.ICacheListener; 027import org.apache.commons.jcs.engine.stats.StatElement; 028import org.apache.commons.jcs.engine.stats.Stats; 029import org.apache.commons.jcs.engine.stats.behavior.IStatElement; 030import org.apache.commons.jcs.engine.stats.behavior.IStats; 031import org.apache.commons.jcs.utils.threadpool.ThreadPoolManager; 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034 035/** 036 * An event queue is used to propagate ordered cache events to one and only one target listener. 037 * <p> 038 * This is a modified version of the experimental version. It uses a PooledExecutor and a 039 * BoundedBuffer to queue up events and execute them as threads become available. 040 * <p> 041 * The PooledExecutor is static, because presumably these processes will be IO bound, so throwing 042 * more than a few threads at them will serve no purpose other than to saturate the IO interface. In 043 * light of this, having one thread per region seems unnecessary. This may prove to be false. 044 */ 045public class PooledCacheEventQueue<K, V> 046 extends AbstractCacheEventQueue<K, V> 047{ 048 /** The logger. */ 049 private static final Log log = LogFactory.getLog( PooledCacheEventQueue.class ); 050 051 /** The type of event queue */ 052 private static final QueueType queueType = QueueType.POOLED; 053 054 /** The Thread Pool to execute events with. */ 055 private ThreadPoolExecutor pool = null; 056 057 /** 058 * Constructor for the CacheEventQueue object 059 * <p> 060 * @param listener 061 * @param listenerId 062 * @param cacheName 063 * @param maxFailure 064 * @param waitBeforeRetry 065 * @param threadPoolName 066 */ 067 public PooledCacheEventQueue( ICacheListener<K, V> listener, long listenerId, String cacheName, int maxFailure, 068 int waitBeforeRetry, String threadPoolName ) 069 { 070 initialize( listener, listenerId, cacheName, maxFailure, waitBeforeRetry, threadPoolName ); 071 } 072 073 /** 074 * Initializes the queue. 075 * <p> 076 * @param listener 077 * @param listenerId 078 * @param cacheName 079 * @param maxFailure 080 * @param waitBeforeRetry 081 * @param threadPoolName 082 */ 083 protected void initialize( ICacheListener<K, V> listener, long listenerId, String cacheName, int maxFailure, 084 int waitBeforeRetry, String threadPoolName ) 085 { 086 super.initialize(listener, listenerId, cacheName, maxFailure, waitBeforeRetry); 087 088 // this will share the same pool with other event queues by default. 089 pool = ThreadPoolManager.getInstance().getPool( 090 (threadPoolName == null) ? "cache_event_queue" : threadPoolName ); 091 } 092 093 /** 094 * @return the queue type 095 */ 096 @Override 097 public QueueType getQueueType() 098 { 099 return queueType; 100 } 101 102 /** 103 * Destroy the queue. Interrupt all threads. 104 */ 105 @Override 106 public synchronized void destroy() 107 { 108 if ( isAlive() ) 109 { 110 setAlive(false); 111 pool.shutdownNow(); 112 if ( log.isInfoEnabled() ) 113 { 114 log.info( "Cache event queue destroyed: " + this ); 115 } 116 } 117 } 118 119 /** 120 * Adds an event to the queue. 121 * <p> 122 * @param event 123 */ 124 @Override 125 protected void put( AbstractCacheEvent event ) 126 { 127 pool.execute( event ); 128 } 129 130 /** 131 * @return IStats 132 */ 133 @Override 134 public IStats getStatistics() 135 { 136 IStats stats = new Stats(); 137 stats.setTypeName( "Pooled Cache Event Queue" ); 138 139 ArrayList<IStatElement<?>> elems = new ArrayList<IStatElement<?>>(); 140 141 elems.add(new StatElement<Boolean>( "Working", Boolean.valueOf(super.isWorking()) ) ); 142 elems.add(new StatElement<Boolean>( "Alive", Boolean.valueOf(this.isAlive()) ) ); 143 elems.add(new StatElement<Boolean>( "Empty", Boolean.valueOf(this.isEmpty()) ) ); 144 145 if ( pool.getQueue() != null ) 146 { 147 BlockingQueue<Runnable> bb = pool.getQueue(); 148 elems.add(new StatElement<Integer>( "Queue Size", Integer.valueOf(bb.size()) ) ); 149 elems.add(new StatElement<Integer>( "Queue Capacity", Integer.valueOf(bb.remainingCapacity()) ) ); 150 } 151 152 elems.add(new StatElement<Integer>( "Pool Size", Integer.valueOf(pool.getPoolSize()) ) ); 153 elems.add(new StatElement<Integer>( "Maximum Pool Size", Integer.valueOf(pool.getMaximumPoolSize()) ) ); 154 155 stats.setStatElements( elems ); 156 157 return stats; 158 } 159 160 /** 161 * If the Queue is using a bounded channel we can determine the size. If it is zero or we can't 162 * determine the size, we return true. 163 * <p> 164 * @return whether or not there are items in the queue 165 */ 166 @Override 167 public boolean isEmpty() 168 { 169 if ( pool.getQueue() == null ) 170 { 171 return true; 172 } 173 else 174 { 175 return pool.getQueue().size() == 0; 176 } 177 } 178 179 /** 180 * Returns the number of elements in the queue. If the queue cannot determine the size 181 * accurately it will return 1. 182 * <p> 183 * @return number of items in the queue. 184 */ 185 @Override 186 public int size() 187 { 188 if ( pool.getQueue() == null ) 189 { 190 return 0; 191 } 192 else 193 { 194 return pool.getQueue().size(); 195 } 196 } 197}