001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.collections4.bloomfilter;
018
019import java.util.Arrays;
020import java.util.Objects;
021import java.util.function.IntPredicate;
022import java.util.function.LongPredicate;
023import java.util.stream.IntStream;
024
025/**
026 * A counting Bloom filter using an int array to track cells for each enabled bit.
027 *
028 * <p>Any operation that results in negative counts or integer overflow of
029 * counts will mark this filter as invalid. This transition is not reversible.
030 * The operation is completed in full, no exception is raised and the state is
031 * set to invalid. This allows the cells for the filter immediately prior to the
032 * operation that created the invalid state to be recovered. See the documentation
033 * in {@link #isValid()} for details.</p>
034 *
035 * <p>All the operations in the filter assume the cells are currently valid,
036 * for example {@code cardinality} or {@code contains} operations. Behavior of an invalid
037 * filter is undefined. It will no longer function identically to a standard
038 * Bloom filter that is the merge of all the Bloom filters that have been added
039 * to and not later subtracted from the counting Bloom filter.</p>
040 *
041 * <p>The maximum supported number of items that can be stored in the filter is
042 * limited by the maximum array size combined with the {@link Shape}. For
043 * example an implementation using a {@link Shape} with a false-positive
044 * probability of 1e-6 and {@link Integer#MAX_VALUE} bits can reversibly store
045 * approximately 75 million items using 20 hash functions per item with a memory
046 * consumption of approximately 8 GB.
047 *
048 * @see Shape
049 * @see CellProducer
050 * @since 4.5
051 */
052public final class ArrayCountingBloomFilter implements CountingBloomFilter {
053
054    /**
055     * The shape of this Bloom filter.
056     */
057    private final Shape shape;
058
059    /**
060     * The cell for each bit index in the filter.
061     */
062    private final int[] cells;
063
064    /**
065     * The state flag. This is a bitwise {@code OR} of the entire history of all updated
066     * cells. If negative then a negative cell or integer overflow has occurred on
067     * one or more cells in the history of the filter and the state is invalid.
068     *
069     * <p>Maintenance of this state flag is branch-free for improved performance. It
070     * eliminates a conditional check for a negative cell during remove/subtract
071     * operations and a conditional check for integer overflow during merge/add
072     * operations.</p>
073     *
074     * <p>Note: Integer overflow is unlikely in realistic usage scenarios. A cell
075     * that overflows indicates that the number of items in the filter exceeds the
076     * maximum possible size (number of bits) of any Bloom filter constrained by
077     * integer indices. At this point the filter is most likely full (all bits are
078     * non-zero) and thus useless.</p>
079     *
080     * <p>Negative cells are a concern if the filter is used incorrectly by
081     * removing an item that was never added. It is expected that a user of a
082     * counting Bloom filter will not perform this action as it is a mistake.
083     * Enabling an explicit recovery path for negative or overflow cells is a major
084     * performance burden not deemed necessary for the unlikely scenarios when an
085     * invalid state is created. Maintenance of the state flag is a concession to
086     * flag improper use that should not have a major performance impact.</p>
087     */
088    private int state;
089
090    private ArrayCountingBloomFilter(final ArrayCountingBloomFilter source) {
091        this.shape = source.shape;
092        this.state = source.state;
093        this.cells = source.cells.clone();
094    }
095
096    /**
097     * Constructs an empty counting Bloom filter with the specified shape.
098     *
099     * @param shape the shape of the filter
100     */
101    public ArrayCountingBloomFilter(final Shape shape) {
102        Objects.requireNonNull(shape, "shape");
103        this.shape = shape;
104        cells = new int[shape.getNumberOfBits()];
105    }
106
107    @Override
108    public boolean add(final CellProducer other) {
109        Objects.requireNonNull(other, "other");
110        other.forEachCell(this::add);
111        return isValid();
112    }
113
114    /**
115     * Add to the cell for the bit index.
116     *
117     * @param idx the index
118     * @param addend the amount to add
119     * @return {@code true} always.
120     */
121    private boolean add(final int idx, final int addend) {
122        try {
123            final int updated = cells[idx] + addend;
124            state |= updated;
125            cells[idx] = updated;
126            return true;
127        } catch (final IndexOutOfBoundsException e) {
128            throw new IllegalArgumentException(
129                    String.format("Filter only accepts values in the [0,%d) range", getShape().getNumberOfBits()), e);
130        }
131    }
132
133    @Override
134    public int[] asIndexArray() {
135        return IntStream.range(0, cells.length).filter(i -> cells[i] > 0).toArray();
136    }
137
138    @Override
139    public int cardinality() {
140        return (int) IntStream.range(0, cells.length).filter(i -> cells[i] > 0).count();
141    }
142
143    @Override
144    public int characteristics() {
145        return SPARSE;
146    }
147
148    @Override
149    public void clear() {
150        Arrays.fill(cells, 0);
151    }
152
153    @Override
154    public boolean contains(final BitMapProducer bitMapProducer) {
155        return contains(IndexProducer.fromBitMapProducer(bitMapProducer));
156    }
157
158    @Override
159    public boolean contains(final IndexProducer indexProducer) {
160        return indexProducer.forEachIndex(idx -> this.cells[idx] != 0);
161    }
162
163    @Override
164    public ArrayCountingBloomFilter copy() {
165        return new ArrayCountingBloomFilter(this);
166    }
167
168    @Override
169    public boolean forEachBitMap(final LongPredicate consumer) {
170        Objects.requireNonNull(consumer, "consumer");
171        final int blocksm1 = BitMap.numberOfBitMaps(cells.length) - 1;
172        int i = 0;
173        long value;
174        // must break final block separate as the number of bits may not fall on the long boundary
175        for (int j = 0; j < blocksm1; j++) {
176            value = 0;
177            for (int k = 0; k < Long.SIZE; k++) {
178                if (cells[i++] != 0) {
179                    value |= BitMap.getLongBit(k);
180                }
181            }
182            if (!consumer.test(value)) {
183                return false;
184            }
185        }
186        // Final block
187        value = 0;
188        for (int k = 0; i < cells.length; k++) {
189            if (cells[i++] != 0) {
190                value |= BitMap.getLongBit(k);
191            }
192        }
193        return consumer.test(value);
194    }
195
196    @Override
197    public boolean forEachCell(final CellProducer.CellConsumer consumer) {
198        Objects.requireNonNull(consumer, "consumer");
199        for (int i = 0; i < cells.length; i++) {
200            if (cells[i] != 0 && !consumer.test(i, cells[i])) {
201                return false;
202            }
203        }
204        return true;
205    }
206
207    @Override
208    public boolean forEachIndex(final IntPredicate consumer) {
209        Objects.requireNonNull(consumer, "consumer");
210        for (int i = 0; i < cells.length; i++) {
211            if (cells[i] != 0 && !consumer.test(i)) {
212                return false;
213            }
214        }
215        return true;
216    }
217
218    @Override
219    public int getMaxCell() {
220        return Integer.MAX_VALUE;
221    }
222
223    @Override
224    public int getMaxInsert(final CellProducer cellProducer) {
225        final int[] max = {Integer.MAX_VALUE};
226        cellProducer.forEachCell( (x, y) -> {
227            final int count = cells[x] / y;
228            if (count < max[0]) {
229                max[0] = count;
230            }
231            return max[0] > 0;
232        });
233        return max[0];
234    }
235
236    @Override
237    public Shape getShape() {
238        return shape;
239    }
240
241    /**
242     * {@inheritDoc}
243     *
244     * <p><em>Implementation note</em>
245     *
246     * <p>The state transition to invalid is permanent.</p>
247     *
248     * <p>This implementation does not correct negative cells to zero or integer
249     * overflow cells to {@link Integer#MAX_VALUE}. Thus the operation that
250     * generated invalid cells can be reversed by using the complement of the
251     * original operation with the same Bloom filter. This will restore the cells
252     * to the state prior to the invalid operation. Cells can then be extracted
253     * using {@link #forEachCell(CellConsumer)}.</p>
254     */
255    @Override
256    public boolean isValid() {
257        return state >= 0;
258    }
259
260    @Override
261    public boolean subtract(final CellProducer other) {
262        Objects.requireNonNull(other, "other");
263        other.forEachCell(this::subtract);
264        return isValid();
265    }
266
267    /**
268     * Subtract from the cell for the bit index.
269     *
270     * @param idx the index
271     * @param subtrahend the amount to subtract
272     * @return {@code true} always.
273     */
274    private boolean subtract(final int idx, final int subtrahend) {
275        try {
276            final int updated = cells[idx] - subtrahend;
277            state |= updated;
278            cells[idx] = updated;
279            return true;
280        } catch (final IndexOutOfBoundsException e) {
281            throw new IllegalArgumentException(
282                    String.format("Filter only accepts values in the [0,%d) range", getShape().getNumberOfBits()), e);
283        }
284    }
285}