Apache Commons logo Commons Functor

Overview

The aggregator package provides a component which can be "fed" data from various sources and apply various computations to this data to "aggregate" it. The design allows for a pluggable implementation to be used for the purpose of aggregation and also provides flexibility with regards to storing the data being fed to this component.

There are 2 implementations as far as storage is concerned : nostore and List-backed store. The former should be used when the aggregation formula is simple enough to be applied on-the-fly without having to traverse a series of data -- e.g. summarizing / adding the data -- while the latter can perform more advanced processing -- e.g. statistical calculations: median, average, percentile etc.

Storage

The List-backed store implementation stores the whole data series in a growing List such that when data is aggregated one can traverse the series and apply complex formulas (find the median of a set of numeric values for instance).

The framework allows for any List-based implementation to be plugged in, by implementing createList(), however, there is also an ArrayList-based implementation provided in ArrayListBackedAggregator which can be used out-of-the-box.

While the store implementation stores the data in a list, the nostore one stores just a single object -- every time data is fed into the Aggregator, the data stored in this object and the data passed in are "aggregated" there and then using the supplied formula and the result of this operation is stored back in the object this implementation uses to store data.

This has the implication that unlike the list-backed storage implementation (where the result of aggregating the data is not known until evaluate() is called), with the nostore implementation the result is known (and saved back) at any moment. This arguably makes this class "faster", however this comes at the cost of a slower add() operation, as the aggregation formula is applied. Also, let's remind ourselves that not all formulas can be implemented using the nostore implementation (see above).

Flushing Data

The data an aggregator handles can be "reset" at any point -- in the case of the store implementation this means emptying the List; in the case of nostore this means setting the "start value" to a "neutral" value -- for instance 0 for implementations which involve additions (Y + 0 = Y), 1 for multiplication (Y * 1 = Y) etc. This operation ultimately resets the aggregator to the initial state it was in when created in the first place. This feature allows for a caller to schedule regular resets to free up resources etc.

Timers

Retrieving the data and regular intervals then flush/reset the aggregator was an envisaged scenario for the aggregator -- so the base class AbstractTimedAggregator offers support to start a timer internally to do that. The class offers a listener mechanism where classes can register to receive timer notifications (if timer support was configured) and after all listeners have been notified the aggregator is reset. The result of evaluate() (the result of aggregating data) is passed in to the listeners. This allows for an object to simply register itself as a timer listener to the aggregator and only react to the timer notifications (e.g. write the result to a file for offline analysis etc) while the aggregator itself manages all the internals.

When the data is being flushed/reset, a TimedAggregatorListener can be registered to receive a notification. The notification is sent after the data is reset. Prior to resetting the data, evaluate() is called, and the result of the evaluation is sent to the listener.

This allows for data aggregated periodically to be processed (e.g. logged, written into a database etc) without worrying about what happens in between.

There are 2 ways the AbstractTimedAggregator can be configured with timer support:

  • Using a per-instance timer : by default, when using the 1 parameter constructor with a value > 0, the aggregator will create a Timer object internally and schedule the regular task of resetting the aggregator under this Timer. While this helps preventing memory leaks (both Aggregator and Timer will be recycled together) it does increase the threading level in the system with each aggregator object created. If your system creates a lot of AbstractTimedAggregator instances with timer support, you might want to consider using the shared timer (see below).
  • Using a shared timer : the AbstractTimedAggregator class creates a static timer instance which can be shared amongst all instances of this class, if using the constructor with 2 params. When doing so, each such instance will schedule its regular reset task against this timer. While this reduces the memory footprint and threading on the system, it can lead to memory leaks if this is not handled correctly. Therefore please make sure you invoke stop() on such aggregators when finished working with them.

It is recommended you always invoke stop() at the end of the lifecycle of an aggregator -- regardless of timer support (shared / per instance) or not -- in particular calling stop() on an aggregator with no timer support has no effect, however doing so ensures code uniformity and cleanliness.

Examples

An example of usage for this component (though not necessarily the only one) could be in a highly transactional system where, for instance, the average transaction time can be an indication of the system health. While a fully-fledged monitoring system can say monitor the system load, file system etc., you might also want your monitoring system to keep an eye on the average transaction time. One way to do so is have an AbstractTimedAggregator which is capturing each transaction execution time and every say 5 seconds it computes the arithmetic mean (average) and writes that to a log file (which then your monitoring system can tail). To do so you would use a piece of code like this:

public class AggregatorTesting implements TimedAggregatorListener<Double> {
    Aggregator<Double> agg;

    public AggregatorTesting() {
        AbstractTimedAggregator<Double> aggregator = new ArrayListBackedAggregator<Double>(new DoubleMeanValueAggregatorFunction(),
                5000L);
        aggregator.addTimerListener(this);
        this.agg = aggregator;
    }

    public void onTimer(AbstractTimedAggregator<Double> aggregator, Double evaluation) {
        double aggregated = aggregator.evaluate();
        /* log the value etc. */
    }

    private void add(double d) {
        agg.add(d);
    }

    public static void main(String[] args) {
        AggregatorTesting t = new AggregatorTesting();
        /* add data */
        t.add( 1.0 );
        t.add( 2.0 );
        /* .. */
    }
}
      

Now let's assume that you occasionally see the odd spike in transaction times -- which, for the purpose of this example, we'll assume is normal. As such you are not concerned with these spikes, so you want a formula to exclude them. Chances are an arithmetic median would be more appropriate in this case; in which case the code above can suffer just a small change:

      ...
        AbstractTimedAggregator<Double> aggregator = new ArrayListBackedAggregator<Double>(new DoubleMedianValueAggregatorFunction(),
                5000L);
       ...
      

Or maybe you want to be more precise and ensure that lets say 95% of your transactions take less than a certain execution time, so you replace the median formula with a percentile one:

      ...
        AbstractTimedAggregator<Double> aggregator = new ArrayListBackedAggregator<Double>(new DoublePercentileAggregatorFunction(95),
                5000L);
       ...
      

Or maybe your health indicator is the number of transactions going through the system every 5 seconds. In this case you can use a nostore aggregator with a sum formula like this:

public class AggregatorTesting implements TimedAggregatorListener<Double> {
    Aggregator<Double> agg;

    public AggregatorTesting() {
        AbstractTimedAggregator<Double> aggregator = new AbstractNoStoreAggregator<Double>(
                new DoubleSumAggregatorBinaryFunction(), 5000L) {
            @Override
            protected Double initialValue() {
                return 0.0;
            }
        };
        aggregator.addTimerListener(this);
        this.agg = aggregator;
    }

    public void onTimer(AbstractTimedAggregator<Double> aggregator, Double evaluation) {
        double aggregated = aggregator.evaluate();
        /* log the value etc. */
    }

    private void add(double d) {
        agg.add(d);
    }

    public static void main(String[] args) {
        AggregatorTesting t = new AggregatorTesting();
        /* add data */
        t.add(1.0);
        t.add(2.0);
        /* .. */
    }
}
      

(Bear in mind that there is also a IntegerCountAggregatorBinaryFunction too!)

This has the advantage of a lower memory footprint as well (see above). If your healthcheck indicator is based on the maximum transaction time over a 5 seconds interval, then you simply replace the aggregation function with a max value implementation:

      ...
        AbstractTimedAggregator<Double> aggregator = new AbstractNoStoreAggregator<Double>(
                new DoubleMaxAggregatorBinaryFunction(), 5000L) {
      ...
      

Or you can simply roll out your own code -- if using the nostore implementation, all you need to do is implement a BinaryFunction and pass it to the Aggregator constructor. This function will receive the already-stored aggregated value as the first parameter, and data just passed in (via add()) as the second parameter; the result of this function will be stored back in the aggregator.

For the list-based aggregators, use an instance of AbstractListBackedAggregator and pass in an instance of Function which accepts a List and returns a single object.

Have a look at the org.apache.commons.functor.aggregator.functions package which includes a set of functions to achieve various of these tasks already.

Note on class naming : You will notice in the org.apache.commons.functor.aggregator.functions package there are functions with similar names -- e.g. DoubleSumAggregatorFunction and DoubleSumAggregatorBinaryFunction. The naming convention is that if the function takes 2 parameters (i.e. it is an instance of BinaryFunction then the name of the class in this package will contain BinaryFunction, otherwise, if it is an instance of Function then the name will only contain Function.

The reason behind it is that instances of BinaryFunction can be used with AbstractNoStoreAggregator whereas the instances of Function can be used with AbstractTimedAggregator.