001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.functor.aggregator; 018 019import java.util.Collections; 020import java.util.List; 021import java.util.Timer; 022import java.util.TimerTask; 023import java.util.concurrent.CopyOnWriteArrayList; 024import java.util.concurrent.locks.ReadWriteLock; 025import java.util.concurrent.locks.ReentrantReadWriteLock; 026 027/** 028 * An aggregator which automatically resets the aggregated data at regular 029 * intervals and sends a notification when it is about to do so, so listeners 030 * can decide to gather the information before it is being reset (and log it 031 * etc). This allows for smaller memory footprints for instance in the case of 032 * List-based aggregators, as regularly the list is emptied. Also it allows for 033 * the call to <code>evaluate</code> to represent an "aggregated" value over a 034 * certain period of time. Note that you can still have a regular aggregator 035 * extending this class by specifying an interval less than or equal to zero. 036 * The regular flush/reset will be triggered from a timer which will always be 037 * started as a daemon thread (so it will stop when there are no more non-daemon 038 * threads in the JVM); this class allows 2 types of timers: 039 * <ul> 040 * <li>(default) per instance <code>Timer</code> -- each instance of this class 041 * will create a new <code>Timer</code> and this <code>Timer</code> will have a 042 * single <code>TimerTask</code> scheduled, which is the one that resets this 043 * <code>Aggregator</code> regularly and sends notifications. This way, when the 044 * <code>Aggregator</code> instance is destroyed, the <code>Timer</code> goes as 045 * well.</li> 046 * <li>shared <code>Timer</code> instance -- this class will create a static 047 * instance of <code>Timer</code> which can be shared by other instances of this 048 * class. While this is a bit more effective from a memory and thread management 049 * point of view, it has the downside that if the <code>TimerTask</code>'s are 050 * not managed properly this can create memory leaks. So if you decide to take 051 * this route make sure when you are finished with this instance, to always stop 052 * the timer at the end.</li> 053 * </ul> 054 * <p> 055 * <b>Synchronization</b>: This class provides a thread safe framework so when 056 * {@link #doAdd(Object)}, {@link #reset()} and {@link #evaluate()} is called, 057 * access is synchronized via a read-write lock. {@link #evaluate()} is 058 * considered a read operation and {@link #doAdd(Object)} and {@link #reset()} 059 * are considered write operations. 060 * </p> 061 * 062 * @param <T> 063 * type of data to aggregate 064 */ 065public abstract class AbstractTimedAggregator<T> implements Aggregator<T> { 066 /** 067 * Stores a list to all objects which are listening for time events 068 * generated by this object. If there is no timer programmed (e.g. 069 * {@link #interval} is set to 0) this list will be <code>null</code>. Under 070 * the cover, this will use a <code>CopyOnWriteArrayList</code> since there 071 * aren't too many updates expected to this list. 072 * 073 * @see #interval 074 * @see #timer 075 * @see TimedAggregatorListener 076 */ 077 private List<TimedAggregatorListener<T>> timerListeners; 078 079 /** 080 * As per {@link #timer} javadoc, if the interval specified is zero or less 081 * there will be no <code>Timer</code> created/assigned to this instance. 082 * This constant is defined to make it easier to read code which creates 083 * instances of this class and doesn't assign them a timer. 084 */ 085 public static final long NO_TIMER = 0L; 086 087 /** 088 * Name of the shared timer which will run all the TimerTasks resulted from 089 * creating instances of this class which are set to used the shared timer. 090 * This is useful when looking at thread dumps. For instances which use 091 * their own timer task, the name will be 092 * <code>TIMER_NAME + hashCode()</code>. 093 */ 094 public static final String TIMER_NAME = "TimedSummarizerMainTimer"; 095 096 /** 097 * The main shared timer which will execute all the <code>TimerTasks</code> 098 * resulted from instances of this class which chose to use the shared 099 * timer. Note that this <code>Timer</code> is started as a daemon thread so 100 * it will stop when there are no more non-daemon threads. 101 * 102 * @see #timer 103 */ 104 private static final Timer MAIN_TIMER = new Timer(TIMER_NAME, true); 105 106 /** 107 * The timer instance for this instance. Can point to {@link #MAIN_TIMER} if 108 * shared timer was chosen in constructor or a newly created instance of 109 * <code>Timer</code> which is private to this instance only. 110 * 111 * @see #MAIN_TIMER 112 */ 113 private Timer timer; 114 115 /** 116 * Interval in milliseconds we flush the result of the "summary". This will 117 * be used to set up our <code>TimerTask</code> and schedule it with the 118 * <code>Timer</code>. Every time the timer kicks in after this interval, it 119 * will call {@link #timer()}. If this is set to a value of zero or less, no 120 * timer will be created. 121 */ 122 private long interval; 123 124 /** 125 * This is the task that is created when a new instance of this class is 126 * created. Once created this task will be scheduled with the {@link #timer} 127 * . Calling {@link #stop()} cancels this task and also will set it to null 128 * (so it can be recycled by the garbage collection), otherwise, until that 129 * point this will store a reference to a valid <code>TimerTask</code> 130 * instance. 131 */ 132 private TimerTask task; 133 134 /** 135 * Lock used internally to synchronize access to {@link #add(Object)}, 136 * {@link #reset()} and {@link #evaluate()}. Locks for writing when 137 * {@link #add(Object)} and {@link #reset()} is called and for reading when 138 * {@link #evaluate()} is called. 139 * 140 * @see #add(Object) 141 * @see #evaluate() 142 * @see #reset() 143 */ 144 private ReadWriteLock dataLock; 145 146 /** 147 * Default constructor -- creates an instance of this aggregator with no 148 * <code>Timer</code>. Equivalent to 149 * <code>AbstractTimedAggregator(NO_TIMER)</code>. 150 * 151 * @see #AbstractTimedAggregator(long) 152 */ 153 public AbstractTimedAggregator() { 154 this(NO_TIMER); 155 } 156 157 /** 158 * Creates an aggregator which has a timer at the specified interval 159 * (miliseconds) and uses its own timer rather than the shared 160 * {@link #MAIN_TIMER}. Equivalent to 161 * <code>AbstractTimedAggregator(interval,false)</code>. 162 * 163 * @param interval 164 * interval in miliseconds to set the timer for. 165 * @see #interval 166 * @see #timer 167 * @see #AbstractTimedAggregator(long, boolean) 168 */ 169 public AbstractTimedAggregator(long interval) { 170 this(interval, false); 171 } 172 173 /** 174 * Creates an aggregator which has a timer at the specified interval and 175 * also allows control over using the {@link #MAIN_TIMER shared timer} or 176 * its own per-instance timer. 177 * 178 * @param interval 179 * interval in miliseconds to set the timer for. 180 * @param useSharedTimer 181 * if set to <code>true</code>, {@link #timer} will be set to 182 * {@link #TIMER_NAME}, otherwise a new instance of 183 * <code>Timer</code> will be created. 184 */ 185 public AbstractTimedAggregator(long interval, boolean useSharedTimer) { 186 if (interval <= NO_TIMER) { 187 // not using timer 188 this.interval = NO_TIMER; 189 this.timer = null; 190 this.task = null; 191 this.timerListeners = null; 192 } else { 193 // we have been requested to use timers 194 this.interval = interval; 195 this.timerListeners = new CopyOnWriteArrayList<TimedAggregatorListener<T>>(); 196 if (useSharedTimer) { 197 this.timer = MAIN_TIMER; 198 } else { 199 this.timer = new Timer(TIMER_NAME + hashCode(), true); 200 } 201 // having set up the timer, create the task 202 this.task = new TimerTask() { 203 @Override 204 public void run() { 205 timer(); 206 } 207 }; 208 this.timer.scheduleAtFixedRate(this.task, this.interval, this.interval); 209 } 210 this.dataLock = new ReentrantReadWriteLock(); 211 } 212 213 /** 214 * Getter for {@link #interval}. 215 * 216 * @return Current value of {@link #interval}. 217 */ 218 public final long getInterval() { 219 return interval; 220 } 221 222 /** 223 * Adds the data to this aggregator. This function first locks 224 * {@link #dataLock} for writing then calls {@link #doAdd(Object)}, which 225 * allows subclasses to perform the actual adding to the aggregator and then 226 * at the end it unlocks {@link #dataLock}. 227 * 228 * @param data 229 * Data to be added to the aggregator. 230 * @see #doAdd(Object) 231 * @see #dataLock 232 */ 233 public final void add(T data) { 234 dataLock.writeLock().lock(); 235 try { 236 doAdd(data); 237 } finally { 238 dataLock.writeLock().unlock(); 239 } 240 } 241 242 /** 243 * Function provided to allow subclasses to perform the actual adding of the 244 * data to the aggregator. This function is wrapped by {@link #add(Object)} 245 * so that access to any internal data series (implemented by subclasses) 246 * via {@link #add(Object)} or {@link #evaluate()} or {@link #reset()} is 247 * prohibited during this call, as a <b>write</b> lock is acquired prior to 248 * this function call to ensure this function is the only one which has 249 * access to the data. 250 * 251 * @param data 252 * Data to be aggregated 253 * @see #add(Object) 254 */ 255 protected abstract void doAdd(T data); 256 257 /** 258 * Aggregates all the data this object has been "fed" via calls to 259 * {@link #add(Object)}. Note that this object delegates the call to 260 * {@link #doEvaluate()} after it secured read-only access to 261 * {@link #dataLock} -- so any data series access can be safely read 262 * (however, subclasses should NOT try to modify any data series they might 263 * implement at this point!). The lock is released after 264 * {@link #doEvaluate()} returns. 265 * 266 * @return result of aggregating the data, as returned by 267 * {@link #doEvaluate()} 268 * @see #doEvaluate() 269 */ 270 public final T evaluate() { 271 dataLock.readLock().lock(); 272 try { 273 return doEvaluate(); 274 } finally { 275 dataLock.readLock().unlock(); 276 } 277 } 278 279 /** 280 * Allows subclasses to perform the actual evaluation of the aggregated 281 * result in a thread-safe manner. When this function is called, 282 * <b>write</b> access to data (via {@link #add(Object)} and 283 * {@link #reset()}) is prohibited until this function finishes. However, 284 * please note that other read access (via calls to the same 285 * {@link #evaluate()}) is possible. 286 * 287 * @return Result of evaluating the aggregated data 288 */ 289 protected abstract T doEvaluate(); 290 291 /** 292 * Resets this aggregator.This function first locks {@link #dataLock} for 293 * writing then calls {@link #doReset()}, which allows subclasses to perform 294 * the actual resetting of the aggregator and then at the end it unlocks 295 * {@link #dataLock}. 296 * 297 * @see #doReset() 298 */ 299 public final void reset() { 300 dataLock.writeLock().lock(); 301 try { 302 doReset(); 303 } finally { 304 dataLock.writeLock().unlock(); 305 } 306 } 307 308 /** 309 * Function provided to allow subclasses to perform the actual reset of the 310 * aggregator. This function is wrapped by {@link #reset()} so that access 311 * to data (via {@link #add(Object)} or {@link #evaluate()} or 312 * {@link #reset()}) is prohibited during this call, as a <b>write</b> lock 313 * is acquired prior to this function call to ensure this function is the 314 * only one which has access to the data. 315 */ 316 protected abstract void doReset(); 317 318 /** 319 * Retrieves the size of the currently-stored data series. This function 320 * first locks {@link #dataLock} for reading then calls 321 * {@link #retrieveDataSize()}, which allows subclasses to compute the data 322 * series size and then at the end it unlocks {@link #dataLock}. 323 * 324 * @return Size of the current data series, which will be aggregated at the 325 * next call to {@link #evaluate()} 326 */ 327 public final int getDataSize() { 328 dataLock.readLock().lock(); 329 try { 330 return retrieveDataSize(); 331 } finally { 332 dataLock.readLock().unlock(); 333 } 334 } 335 336 /** 337 * Function provided to allow subclasses to retrieve the actual size of the 338 * data series. This function is wrapped by {@link #getDataSize()} so that 339 * access to data (via {@link #add(Object)} or {@link #reset()}) is 340 * prohibited during this call, as a <b>read</b> lock is acquired prior to 341 * this function call. (However, calls to {@link #evaluate()} are allowed as 342 * that locks for reading too.) 343 * 344 * @return Size of the current data series. Zero means no data stored. 345 */ 346 protected abstract int retrieveDataSize(); 347 348 /** 349 * Retrieves <b>an unmodifiable copy</b> of the {@link #timerListeners timer 350 * listeners}. Used for testing. 351 * 352 * @return <code>Collections.unmodifiableList(timerListeners)</code> if 353 * {@link #timerListeners} is <b>not</b> <code>null</code>, or 354 * <code>null</code> otherwise. 355 */ 356 final List<TimedAggregatorListener<T>> getTimerListeners() { 357 if (timerListeners == null) { 358 return null; 359 } 360 return Collections.unmodifiableList(timerListeners); 361 } 362 363 /** 364 * If this <code>Aggregator</code> has been started with timer support, it 365 * will add the given listener, so it receives 366 * {@link TimedAggregatorListener#onTimer(AbstractTimedAggregator,Object) 367 * timer events}. If no timer support has been configured for this 368 * Aggregator, this call has no effect. 369 * 370 * @param listener 371 * Listener to be added to received timer events from this 372 * aggregator. 373 * @see #timerListeners 374 */ 375 public final void addTimerListener(TimedAggregatorListener<T> listener) { 376 if (timerListeners == null) { 377 return; 378 } 379 timerListeners.add(listener); 380 } 381 382 /** 383 * Removes a listener from the timer listeners list if previously added. If 384 * this Aggregator has been configured with no timer support, this call will 385 * always return <code>false</code>. 386 * 387 * @param listener 388 * Listener to be removed from the list. NullPointerException 389 * thrown if this is null. 390 * @return <code>true</code> if this Aggregator has timer support and the 391 * listener passed in was previously added (via 392 * {@link #addTimerListener(TimedAggregatorListener)}) or false if 393 * either the Aggregator has no timer support or it has timer 394 * support but the listener was never registered with this 395 * Aggregator. 396 * @see #timerListeners 397 */ 398 public final boolean removeTimerListener(TimedAggregatorListener<T> listener) { 399 if (timerListeners == null) { 400 return false; 401 } 402 return timerListeners.remove(listener); 403 } 404 405 /** 406 * Computes the current aggregated value (by calling {@link #evaluate()}, 407 * resets this aggregator then notifies all listeners. Go through all the 408 * {@link #timerListeners} and sends 409 * {@link TimedAggregatorListener#onTimer(AbstractTimedAggregator,Object) 410 * notification messages} to each of them. Does nothing if 411 * {@link #timerListeners} is <code>null</code>. Please note that 412 * {@link #evaluate()} is called only once at the beginning of this 413 * function, and only if there are listeners configured, then this value is 414 * passed to every notification. This is in order to ensure all listeners 415 * receive the same value -- the value of the evaluation prior to resetting 416 * it. 417 */ 418 private void timer() { 419 if (timerListeners != null) { 420 // if we have listeners, notify them 421 T aggregated = evaluate(); // NOTE: shouldn't evaluate() and reset() 422 // be done atomically here? 423 reset(); 424 for (TimedAggregatorListener<T> i : timerListeners) { 425 i.onTimer(this, aggregated); 426 } 427 } else { 428 reset(); 429 } 430 } 431 432 /** 433 * Checks whether this instance has a timer associated with it or not. If 434 * there is a timer for this Aggregator, then the {@link #task} member 435 * should be set to a non-null value. 436 * 437 * @return <code>true</code> if {@link #task} is not null, 438 * <code>false</code> otherwise (in which case there is no timer). 439 */ 440 public final boolean isTimerEnabled() { 441 return (task != null); 442 } 443 444 /** 445 * Checks whether this instance uses its own timer or {@link #MAIN_TIMER the 446 * shared timer} for scheduling {@link #task the timer task}. 447 * 448 * @return <code>true</code> if <code>timer == MAIN_TIMER</code> or 449 * <code>false</code> otherwise. 450 */ 451 public final boolean isSharedTimer() { 452 return (timer == MAIN_TIMER); 453 } 454 455 /** 456 * Cancels the current timer task (if set) -- which means from there on the 457 * data will not be reset anymore. Also, if {@link #timer} is not set to 458 * {@link #MAIN_TIMER the shared timer} then it will be cancelled as well 459 * Also releases all the listeners from the {@link #timerListeners list}. 460 */ 461 public final void stop() { 462 // cancel the task first 463 if (task != null) { 464 task.cancel(); 465 task = null; 466 timer.purge(); // remove the reference to this task 467 } 468 // then the timer if needed 469 if (timer != null && timer != MAIN_TIMER) { 470 timer.cancel(); 471 } 472 timer = null; 473 // finally remove the elements from the listeners list 474 if (timerListeners != null) { 475 timerListeners.clear(); 476 } 477 } 478 479 @Override 480 protected final void finalize() throws Throwable { 481 // if we're going in the garbage, make sure we clean up 482 stop(); 483 } 484 485 @Override 486 public String toString() { 487 return AbstractTimedAggregator.class.getName(); 488 } 489}