001package org.apache.commons.jcs3.engine; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.util.ArrayList; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.ExecutorService; 025import java.util.concurrent.ThreadPoolExecutor; 026 027import org.apache.commons.jcs3.engine.behavior.ICacheListener; 028import org.apache.commons.jcs3.engine.stats.StatElement; 029import org.apache.commons.jcs3.engine.stats.Stats; 030import org.apache.commons.jcs3.engine.stats.behavior.IStatElement; 031import org.apache.commons.jcs3.engine.stats.behavior.IStats; 032import org.apache.commons.jcs3.log.Log; 033import org.apache.commons.jcs3.log.LogManager; 034import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager; 035 036/** 037 * An event queue is used to propagate ordered cache events to one and only one target listener. 038 * <p> 039 * This is a modified version of the experimental version. It uses a PooledExecutor and a 040 * BoundedBuffer to queue up events and execute them as threads become available. 041 * <p> 042 * The PooledExecutor is static, because presumably these processes will be IO bound, so throwing 043 * more than a few threads at them will serve no purpose other than to saturate the IO interface. In 044 * light of this, having one thread per region seems unnecessary. This may prove to be false. 045 */ 046public class PooledCacheEventQueue<K, V> 047 extends AbstractCacheEventQueue<K, V> 048{ 049 /** The logger. */ 050 private static final Log log = LogManager.getLog( PooledCacheEventQueue.class ); 051 052 /** The Thread Pool to execute events with. */ 053 protected ExecutorService pool; 054 055 /** The Thread Pool queue */ 056 protected BlockingQueue<Runnable> queue; 057 058 /** 059 * Constructor for the CacheEventQueue object 060 * <p> 061 * @param listener 062 * @param listenerId 063 * @param cacheName 064 * @param maxFailure 065 * @param waitBeforeRetry 066 * @param threadPoolName 067 */ 068 public PooledCacheEventQueue( final ICacheListener<K, V> listener, final long listenerId, final String cacheName, final int maxFailure, 069 final int waitBeforeRetry, final String threadPoolName ) 070 { 071 initialize( listener, listenerId, cacheName, maxFailure, waitBeforeRetry, threadPoolName ); 072 } 073 074 /** 075 * Initializes the queue. 076 * <p> 077 * @param listener 078 * @param listenerId 079 * @param cacheName 080 * @param maxFailure 081 * @param waitBeforeRetry 082 * @param threadPoolName 083 */ 084 protected void initialize( final ICacheListener<K, V> listener, final long listenerId, final String cacheName, final int maxFailure, 085 final int waitBeforeRetry, final String threadPoolName ) 086 { 087 super.initialize(listener, listenerId, cacheName, maxFailure, waitBeforeRetry); 088 089 pool = createPool(threadPoolName); 090 091 if (pool instanceof ThreadPoolExecutor) 092 { 093 queue = ((ThreadPoolExecutor) pool).getQueue(); 094 } 095 } 096 097 /** 098 * Create the thread pool. 099 * <p> 100 * @param threadPoolName 101 * @since 3.1 102 */ 103 protected ExecutorService createPool(final String threadPoolName) 104 { 105 // this will share the same pool with other event queues by default. 106 return ThreadPoolManager.getInstance().getExecutorService( 107 (threadPoolName == null) ? "cache_event_queue" : threadPoolName ); 108 } 109 110 /** 111 * @return the queue type 112 */ 113 @Override 114 public QueueType getQueueType() 115 { 116 /** The type of queue -- there are pooled and single */ 117 return QueueType.POOLED; 118 } 119 120 /** 121 * Destroy the queue. Interrupt all threads. 122 */ 123 @Override 124 public synchronized void destroy() 125 { 126 if ( isWorking() ) 127 { 128 setWorking(false); 129 log.info( "Cache event queue destroyed: {0}", this ); 130 } 131 } 132 133 /** 134 * Adds an event to the queue. 135 * <p> 136 * @param event 137 */ 138 @Override 139 protected void put( final AbstractCacheEvent event ) 140 { 141 pool.execute( event ); 142 } 143 144 /** 145 * @return IStats 146 */ 147 @Override 148 public IStats getStatistics() 149 { 150 final IStats stats = new Stats(); 151 stats.setTypeName( "Pooled Cache Event Queue" ); 152 153 final ArrayList<IStatElement<?>> elems = new ArrayList<>(); 154 155 elems.add(new StatElement<>( "Working", Boolean.valueOf(isWorking()) ) ); 156 elems.add(new StatElement<>( "Empty", Boolean.valueOf(this.isEmpty()) ) ); 157 158 if ( queue != null ) 159 { 160 elems.add(new StatElement<>( "Queue Size", Integer.valueOf(queue.size()) ) ); 161 elems.add(new StatElement<>( "Queue Capacity", Integer.valueOf(queue.remainingCapacity()) ) ); 162 } 163 164 stats.setStatElements( elems ); 165 166 return stats; 167 } 168 169 /** 170 * If the Queue is using a bounded channel we can determine the size. If it is zero or we can't 171 * determine the size, we return true. 172 * <p> 173 * @return whether or not there are items in the queue 174 */ 175 @Override 176 public boolean isEmpty() 177 { 178 return size() == 0; 179 } 180 181 /** 182 * Returns the number of elements in the queue. If the queue cannot determine the size 183 * accurately it will return 0. 184 * <p> 185 * @return number of items in the queue. 186 */ 187 @Override 188 public int size() 189 { 190 if ( queue == null ) 191 { 192 return 0; 193 } 194 return queue.size(); 195 } 196}