001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.functor.aggregator;
018
019import java.util.Collections;
020import java.util.List;
021import java.util.Timer;
022import java.util.TimerTask;
023import java.util.concurrent.CopyOnWriteArrayList;
024import java.util.concurrent.locks.ReadWriteLock;
025import java.util.concurrent.locks.ReentrantReadWriteLock;
026
027/**
028 * An aggregator which automatically resets the aggregated data at regular
029 * intervals and sends a notification when it is about to do so, so listeners
030 * can decide to gather the information before it is being reset (and log it
031 * etc). This allows for smaller memory footprints for instance in the case of
032 * List-based aggregators, as regularly the list is emptied. Also it allows for
033 * the call to <code>evaluate</code> to represent an "aggregated" value over a
034 * certain period of time. Note that you can still have a regular aggregator
035 * extending this class by specifying an interval less than or equal to zero.
036 * The regular flush/reset will be triggered from a timer which will always be
037 * started as a daemon thread (so it will stop when there are no more non-daemon
038 * threads in the JVM); this class allows 2 types of timers:
039 * <ul>
040 * <li>(default) per instance <code>Timer</code> -- each instance of this class
041 * will create a new <code>Timer</code> and this <code>Timer</code> will have a
042 * single <code>TimerTask</code> scheduled, which is the one that resets this
043 * <code>Aggregator</code> regularly and sends notifications. This way, when the
044 * <code>Aggregator</code> instance is destroyed, the <code>Timer</code> goes as
045 * well.</li>
046 * <li>shared <code>Timer</code> instance -- this class will create a static
047 * instance of <code>Timer</code> which can be shared by other instances of this
048 * class. While this is a bit more effective from a memory and thread management
049 * point of view, it has the downside that if the <code>TimerTask</code>'s are
050 * not managed properly this can create memory leaks. So if you decide to take
051 * this route make sure when you are finished with this instance, to always stop
052 * the timer at the end.</li>
053 * </ul>
054 * <p>
055 * <b>Synchronization</b>: This class provides a thread safe framework so when
056 * {@link #doAdd(Object)}, {@link #reset()} and {@link #evaluate()} is called,
057 * access is synchronized via a read-write lock. {@link #evaluate()} is
058 * considered a read operation and {@link #doAdd(Object)} and {@link #reset()}
059 * are considered write operations.
060 * </p>
061 *
062 * @param <T>
063 *            type of data to aggregate
064 */
065public abstract class AbstractTimedAggregator<T> implements Aggregator<T> {
066    /**
067     * Stores a list to all objects which are listening for time events
068     * generated by this object. If there is no timer programmed (e.g.
069     * {@link #interval} is set to 0) this list will be <code>null</code>. Under
070     * the cover, this will use a <code>CopyOnWriteArrayList</code> since there
071     * aren't too many updates expected to this list.
072     *
073     * @see #interval
074     * @see #timer
075     * @see TimedAggregatorListener
076     */
077    private List<TimedAggregatorListener<T>> timerListeners;
078
079    /**
080     * As per {@link #timer} javadoc, if the interval specified is zero or less
081     * there will be no <code>Timer</code> created/assigned to this instance.
082     * This constant is defined to make it easier to read code which creates
083     * instances of this class and doesn't assign them a timer.
084     */
085    public static final long                 NO_TIMER   = 0L;
086
087    /**
088     * Name of the shared timer which will run all the TimerTasks resulted from
089     * creating instances of this class which are set to used the shared timer.
090     * This is useful when looking at thread dumps. For instances which use
091     * their own timer task, the name will be
092     * <code>TIMER_NAME + hashCode()</code>.
093     */
094    public static final String               TIMER_NAME = "TimedSummarizerMainTimer";
095
096    /**
097     * The main shared timer which will execute all the <code>TimerTasks</code>
098     * resulted from instances of this class which chose to use the shared
099     * timer. Note that this <code>Timer</code> is started as a daemon thread so
100     * it will stop when there are no more non-daemon threads.
101     *
102     * @see #timer
103     */
104    private static final Timer               MAIN_TIMER = new Timer(TIMER_NAME, true);
105
106    /**
107     * The timer instance for this instance. Can point to {@link #MAIN_TIMER} if
108     * shared timer was chosen in constructor or a newly created instance of
109     * <code>Timer</code> which is private to this instance only.
110     *
111     * @see #MAIN_TIMER
112     */
113    private Timer                            timer;
114
115    /**
116     * Interval in milliseconds we flush the result of the "summary". This will
117     * be used to set up our <code>TimerTask</code> and schedule it with the
118     * <code>Timer</code>. Every time the timer kicks in after this interval, it
119     * will call {@link #timer()}. If this is set to a value of zero or less, no
120     * timer will be created.
121     */
122    private long                             interval;
123
124    /**
125     * This is the task that is created when a new instance of this class is
126     * created. Once created this task will be scheduled with the {@link #timer}
127     * . Calling {@link #stop()} cancels this task and also will set it to null
128     * (so it can be recycled by the garbage collection), otherwise, until that
129     * point this will store a reference to a valid <code>TimerTask</code>
130     * instance.
131     */
132    private TimerTask                        task;
133
134    /**
135     * Lock used internally to synchronize access to {@link #add(Object)},
136     * {@link #reset()} and {@link #evaluate()}. Locks for writing when
137     * {@link #add(Object)} and {@link #reset()} is called and for reading when
138     * {@link #evaluate()} is called.
139     *
140     * @see #add(Object)
141     * @see #evaluate()
142     * @see #reset()
143     */
144    private ReadWriteLock                    dataLock;
145
146    /**
147     * Default constructor -- creates an instance of this aggregator with no
148     * <code>Timer</code>. Equivalent to
149     * <code>AbstractTimedAggregator(NO_TIMER)</code>.
150     *
151     * @see #AbstractTimedAggregator(long)
152     */
153    public AbstractTimedAggregator() {
154        this(NO_TIMER);
155    }
156
157    /**
158     * Creates an aggregator which has a timer at the specified interval
159     * (miliseconds) and uses its own timer rather than the shared
160     * {@link #MAIN_TIMER}. Equivalent to
161     * <code>AbstractTimedAggregator(interval,false)</code>.
162     *
163     * @param interval
164     *            interval in miliseconds to set the timer for.
165     * @see #interval
166     * @see #timer
167     * @see #AbstractTimedAggregator(long, boolean)
168     */
169    public AbstractTimedAggregator(long interval) {
170        this(interval, false);
171    }
172
173    /**
174     * Creates an aggregator which has a timer at the specified interval and
175     * also allows control over using the {@link #MAIN_TIMER shared timer} or
176     * its own per-instance timer.
177     *
178     * @param interval
179     *            interval in miliseconds to set the timer for.
180     * @param useSharedTimer
181     *            if set to <code>true</code>, {@link #timer} will be set to
182     *            {@link #TIMER_NAME}, otherwise a new instance of
183     *            <code>Timer</code> will be created.
184     */
185    public AbstractTimedAggregator(long interval, boolean useSharedTimer) {
186        if (interval <= NO_TIMER) {
187            // not using timer
188            this.interval = NO_TIMER;
189            this.timer = null;
190            this.task = null;
191            this.timerListeners = null;
192        } else {
193            // we have been requested to use timers
194            this.interval = interval;
195            this.timerListeners = new CopyOnWriteArrayList<TimedAggregatorListener<T>>();
196            if (useSharedTimer) {
197                this.timer = MAIN_TIMER;
198            } else {
199                this.timer = new Timer(TIMER_NAME + hashCode(), true);
200            }
201            // having set up the timer, create the task
202            this.task = new TimerTask() {
203                @Override
204                public void run() {
205                    timer();
206                }
207            };
208            this.timer.scheduleAtFixedRate(this.task, this.interval, this.interval);
209        }
210        this.dataLock = new ReentrantReadWriteLock();
211    }
212
213    /**
214     * Getter for {@link #interval}.
215     *
216     * @return Current value of {@link #interval}.
217     */
218    public final long getInterval() {
219        return interval;
220    }
221
222    /**
223     * Adds the data to this aggregator. This function first locks
224     * {@link #dataLock} for writing then calls {@link #doAdd(Object)}, which
225     * allows subclasses to perform the actual adding to the aggregator and then
226     * at the end it unlocks {@link #dataLock}.
227     *
228     * @param data
229     *            Data to be added to the aggregator.
230     * @see #doAdd(Object)
231     * @see #dataLock
232     */
233    public final void add(T data) {
234        dataLock.writeLock().lock();
235        try {
236            doAdd(data);
237        } finally {
238            dataLock.writeLock().unlock();
239        }
240    }
241
242    /**
243     * Function provided to allow subclasses to perform the actual adding of the
244     * data to the aggregator. This function is wrapped by {@link #add(Object)}
245     * so that access to any internal data series (implemented by subclasses)
246     * via {@link #add(Object)} or {@link #evaluate()} or {@link #reset()} is
247     * prohibited during this call, as a <b>write</b> lock is acquired prior to
248     * this function call to ensure this function is the only one which has
249     * access to the data.
250     *
251     * @param data
252     *            Data to be aggregated
253     * @see #add(Object)
254     */
255    protected abstract void doAdd(T data);
256
257    /**
258     * Aggregates all the data this object has been "fed" via calls to
259     * {@link #add(Object)}. Note that this object delegates the call to
260     * {@link #doEvaluate()} after it secured read-only access to
261     * {@link #dataLock} -- so any data series access can be safely read
262     * (however, subclasses should NOT try to modify any data series they might
263     * implement at this point!). The lock is released after
264     * {@link #doEvaluate()} returns.
265     *
266     * @return result of aggregating the data, as returned by
267     *         {@link #doEvaluate()}
268     * @see #doEvaluate()
269     */
270    public final T evaluate() {
271        dataLock.readLock().lock();
272        try {
273            return doEvaluate();
274        } finally {
275            dataLock.readLock().unlock();
276        }
277    }
278
279    /**
280     * Allows subclasses to perform the actual evaluation of the aggregated
281     * result in a thread-safe manner. When this function is called,
282     * <b>write</b> access to data (via {@link #add(Object)} and
283     * {@link #reset()}) is prohibited until this function finishes. However,
284     * please note that other read access (via calls to the same
285     * {@link #evaluate()}) is possible.
286     *
287     * @return Result of evaluating the aggregated data
288     */
289    protected abstract T doEvaluate();
290
291    /**
292     * Resets this aggregator.This function first locks {@link #dataLock} for
293     * writing then calls {@link #doReset()}, which allows subclasses to perform
294     * the actual resetting of the aggregator and then at the end it unlocks
295     * {@link #dataLock}.
296     *
297     * @see #doReset()
298     */
299    public final void reset() {
300        dataLock.writeLock().lock();
301        try {
302            doReset();
303        } finally {
304            dataLock.writeLock().unlock();
305        }
306    }
307
308    /**
309     * Function provided to allow subclasses to perform the actual reset of the
310     * aggregator. This function is wrapped by {@link #reset()} so that access
311     * to data (via {@link #add(Object)} or {@link #evaluate()} or
312     * {@link #reset()}) is prohibited during this call, as a <b>write</b> lock
313     * is acquired prior to this function call to ensure this function is the
314     * only one which has access to the data.
315     */
316    protected abstract void doReset();
317
318    /**
319     * Retrieves the size of the currently-stored data series. This function
320     * first locks {@link #dataLock} for reading then calls
321     * {@link #retrieveDataSize()}, which allows subclasses to compute the data
322     * series size and then at the end it unlocks {@link #dataLock}.
323     *
324     * @return Size of the current data series, which will be aggregated at the
325     *         next call to {@link #evaluate()}
326     */
327    public final int getDataSize() {
328        dataLock.readLock().lock();
329        try {
330            return retrieveDataSize();
331        } finally {
332            dataLock.readLock().unlock();
333        }
334    }
335
336    /**
337     * Function provided to allow subclasses to retrieve the actual size of the
338     * data series. This function is wrapped by {@link #getDataSize()} so that
339     * access to data (via {@link #add(Object)} or {@link #reset()}) is
340     * prohibited during this call, as a <b>read</b> lock is acquired prior to
341     * this function call. (However, calls to {@link #evaluate()} are allowed as
342     * that locks for reading too.)
343     *
344     * @return Size of the current data series. Zero means no data stored.
345     */
346    protected abstract int retrieveDataSize();
347
348    /**
349     * Retrieves <b>an unmodifiable copy</b> of the {@link #timerListeners timer
350     * listeners}. Used for testing.
351     *
352     * @return <code>Collections.unmodifiableList(timerListeners)</code> if
353     *         {@link #timerListeners} is <b>not</b> <code>null</code>, or
354     *         <code>null</code> otherwise.
355     */
356    final List<TimedAggregatorListener<T>> getTimerListeners() {
357        if (timerListeners == null) {
358            return null;
359        }
360        return Collections.unmodifiableList(timerListeners);
361    }
362
363    /**
364     * If this <code>Aggregator</code> has been started with timer support, it
365     * will add the given listener, so it receives
366     * {@link TimedAggregatorListener#onTimer(AbstractTimedAggregator,Object)
367     * timer events}. If no timer support has been configured for this
368     * Aggregator, this call has no effect.
369     *
370     * @param listener
371     *            Listener to be added to received timer events from this
372     *            aggregator.
373     * @see #timerListeners
374     */
375    public final void addTimerListener(TimedAggregatorListener<T> listener) {
376        if (timerListeners == null) {
377            return;
378        }
379        timerListeners.add(listener);
380    }
381
382    /**
383     * Removes a listener from the timer listeners list if previously added. If
384     * this Aggregator has been configured with no timer support, this call will
385     * always return <code>false</code>.
386     *
387     * @param listener
388     *            Listener to be removed from the list. NullPointerException
389     *            thrown if this is null.
390     * @return <code>true</code> if this Aggregator has timer support and the
391     *         listener passed in was previously added (via
392     *         {@link #addTimerListener(TimedAggregatorListener)}) or false if
393     *         either the Aggregator has no timer support or it has timer
394     *         support but the listener was never registered with this
395     *         Aggregator.
396     * @see #timerListeners
397     */
398    public final boolean removeTimerListener(TimedAggregatorListener<T> listener) {
399        if (timerListeners == null) {
400            return false;
401        }
402        return timerListeners.remove(listener);
403    }
404
405    /**
406     * Computes the current aggregated value (by calling {@link #evaluate()},
407     * resets this aggregator then notifies all listeners. Go through all the
408     * {@link #timerListeners} and sends
409     * {@link TimedAggregatorListener#onTimer(AbstractTimedAggregator,Object)
410     * notification messages} to each of them. Does nothing if
411     * {@link #timerListeners} is <code>null</code>. Please note that
412     * {@link #evaluate()} is called only once at the beginning of this
413     * function, and only if there are listeners configured, then this value is
414     * passed to every notification. This is in order to ensure all listeners
415     * receive the same value -- the value of the evaluation prior to resetting
416     * it.
417     */
418    private void timer() {
419        if (timerListeners != null) {
420            // if we have listeners, notify them
421            T aggregated = evaluate(); // NOTE: shouldn't evaluate() and reset()
422                                       // be done atomically here?
423            reset();
424            for (TimedAggregatorListener<T> i : timerListeners) {
425                i.onTimer(this, aggregated);
426            }
427        } else {
428            reset();
429        }
430    }
431
432    /**
433     * Checks whether this instance has a timer associated with it or not. If
434     * there is a timer for this Aggregator, then the {@link #task} member
435     * should be set to a non-null value.
436     *
437     * @return <code>true</code> if {@link #task} is not null,
438     *         <code>false</code> otherwise (in which case there is no timer).
439     */
440    public final boolean isTimerEnabled() {
441        return (task != null);
442    }
443
444    /**
445     * Checks whether this instance uses its own timer or {@link #MAIN_TIMER the
446     * shared timer} for scheduling {@link #task the timer task}.
447     *
448     * @return <code>true</code> if <code>timer == MAIN_TIMER</code> or
449     *         <code>false</code> otherwise.
450     */
451    public final boolean isSharedTimer() {
452        return (timer == MAIN_TIMER);
453    }
454
455    /**
456     * Cancels the current timer task (if set) -- which means from there on the
457     * data will not be reset anymore. Also, if {@link #timer} is not set to
458     * {@link #MAIN_TIMER the shared timer} then it will be cancelled as well
459     * Also releases all the listeners from the {@link #timerListeners list}.
460     */
461    public final void stop() {
462        // cancel the task first
463        if (task != null) {
464            task.cancel();
465            task = null;
466            timer.purge(); // remove the reference to this task
467        }
468        // then the timer if needed
469        if (timer != null && timer != MAIN_TIMER) {
470            timer.cancel();
471        }
472        timer = null;
473        // finally remove the elements from the listeners list
474        if (timerListeners != null) {
475            timerListeners.clear();
476        }
477    }
478
479    @Override
480    protected final void finalize() throws Throwable {
481        // if we're going in the garbage, make sure we clean up
482        stop();
483    }
484
485    @Override
486    public String toString() {
487        return AbstractTimedAggregator.class.getName();
488    }
489}