View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.numbers.examples.jmh.arrays;
18  
19  import java.util.Arrays;
20  import java.util.function.IntConsumer;
21  
22  /**
23   * A fixed size set of indices within an inclusive range {@code [left, right]}.
24   *
25   * <p>This is a specialised class to implement a reduced API similar to a
26   * {@link java.util.BitSet}. It uses no bounds range checks and supports only a
27   * fixed size. It contains the methods required to store and look-up intervals of indices.
28   *
29   * <p>An offset is supported to allow the fixed size to cover a range of indices starting
30   * above 0 with the most efficient usage of storage.
31   *
32   * <p>The class has methods to directly set and get bits in the range.
33   * Implementations of the {@link PivotCache} interface use range checks and maintain
34   * floating pivots flanking the range to allow bracketing any index within the range.
35   *
36   * <p>Stores all pivots between the support {@code [left, right]}. Uses two
37   * floating pivots which are the closest known pivots surrounding this range.
38   *
39   * <p>See the BloomFilter code in Commons Collections for use of long[] data to store
40   * bits.
41   *
42   * @since 1.2
43   */
44  final class IndexSet implements PivotCache, SearchableInterval, SearchableInterval2 {
45      /** All 64-bits bits set. */
46      private static final long LONG_MASK = -1L;
47      /** A bit shift to apply to an integer to divided by 64 (2^6). */
48      private static final int DIVIDE_BY_64 = 6;
49      /** Default value for an unset upper floating pivot.
50       * Set as a value higher than any valid array index. */
51      private static final int UPPER_DEFAULT = Integer.MAX_VALUE;
52  
53      /** Bit indexes. */
54      private final long[] data;
55  
56      /** Left bound of the support. */
57      private final int left;
58      /** Right bound of the support. */
59      private final int right;
60      /** The upstream pivot closest to the left bound of the support.
61       * Provides a lower search bound for the range [left, right]. */
62      private int lowerPivot = -1;
63      /** The downstream pivot closest to the right bound of the support.
64       * Provides an upper search bound for the range [left, right]. */
65      private int upperPivot = UPPER_DEFAULT;
66  
67      /**
68       * Create an instance to store indices within the range {@code [left, right]}.
69       *
70       * @param left Lower bound (inclusive).
71       * @param right Upper bound (inclusive).
72       */
73      private IndexSet(int left, int right) {
74          this.left = left;
75          this.right = right;
76          // Allocate storage to store index==right
77          // Note: This may allow directly writing to index > right if there
78          // is extra capacity. Ranges checks to prevent this are provided by
79          // the PivotCache.add(int) method rather than using set(int).
80          data = new long[getLongIndex(right - left) + 1];
81      }
82  
83      /**
84       * Create an instance to store indices within the range {@code [left, right]}.
85       *
86       * @param left Lower bound (inclusive).
87       * @param right Upper bound (inclusive).
88       * @return the index set
89       * @throws IllegalArgumentException if {@code right < left}
90       */
91      static IndexSet ofRange(int left, int right) {
92          if (left < 0) {
93              throw new IllegalArgumentException("Invalid lower index: " + left);
94          }
95          checkRange(left, right);
96          return new IndexSet(left, right);
97      }
98  
99      /**
100      * Initialise an instance with the {@code indices}. The capacity is defined by the
101      * range required to store the minimum and maximum index.
102      *
103      * @param indices Indices.
104      * @return the index set
105      * @throws IllegalArgumentException if {@code indices.length == 0}
106      */
107     static IndexSet of(int[] indices) {
108         return of(indices, indices.length);
109     }
110 
111     /**
112      * Initialise an instance with the {@code indices}. The capacity is defined by the
113      * range required to store the minimum and maximum index.
114      *
115      * @param indices Indices.
116      * @param n Number of indices.
117      * @return the index set
118      * @throws IllegalArgumentException if {@code n == 0}
119      */
120     static IndexSet of(int[] indices, int n) {
121         if (n <= 0) {
122             throw new IllegalArgumentException("No indices to define the range");
123         }
124         int min = indices[0];
125         int max = min;
126         for (int i = 1; i < n; i++) {
127             min = Math.min(min, indices[i]);
128             max = Math.max(max, indices[i]);
129         }
130         final IndexSet set = IndexSet.ofRange(min, max);
131         for (int i = 0; i < n; i++) {
132             set.set(indices[i]);
133         }
134         return set;
135     }
136 
137     /**
138      * Return the memory footprint in bytes. This is always a multiple of 64.
139      *
140      * <p>The result is {@code 8 * ceil((right - left + 1) / 64)}.
141      *
142      * <p>This method is intended to provide information to choose if the data structure
143      * is memory efficient.
144      *
145      * <p>Warning: It is assumed {@code 0 <= left <= right}. Use with the min/max index
146      * that is to be stored.
147      *
148      * @param left Lower bound (inclusive).
149      * @param right Upper bound (inclusive).
150      * @return the memory footprint
151      */
152     static long memoryFootprint(int left, int right) {
153         return (getLongIndex(right - left) + 1L) * Long.BYTES;
154     }
155 
156     /**
157      * Gets the filter index for the specified bit index assuming the filter is using
158      * 64-bit longs to store bits starting at index 0.
159      *
160      * <p>The index is assumed to be positive. For a positive index the result will match
161      * {@code bitIndex / 64}.</p>
162      *
163      * <p><em>The divide is performed using bit shifts. If the input is negative the
164      * behavior is not defined.</em></p>
165      *
166      * @param bitIndex the bit index (assumed to be positive)
167      * @return the index of the bit map in an array of bit maps.
168      */
169     private static int getLongIndex(final int bitIndex) {
170         // An integer divide by 64 is equivalent to a shift of 6 bits if the integer is
171         // positive.
172         // We do not explicitly check for a negative here. Instead we use a
173         // signed shift. Any negative index will produce a negative value
174         // by sign-extension and if used as an index into an array it will throw an
175         // exception.
176         return bitIndex >> DIVIDE_BY_64;
177     }
178 
179     /**
180      * Gets the filter bit mask for the specified bit index assuming the filter is using
181      * 64-bit longs to store bits starting at index 0. The returned value is a
182      * {@code long} with only 1 bit set.
183      *
184      * <p>The index is assumed to be positive. For a positive index the result will match
185      * {@code 1L << (bitIndex % 64)}.</p>
186      *
187      * <p><em>If the input is negative the behavior is not defined.</em></p>
188      *
189      * @param bitIndex the bit index (assumed to be positive)
190      * @return the filter bit
191      */
192     private static long getLongBit(final int bitIndex) {
193         // Bit shifts only use the first 6 bits. Thus it is not necessary to mask this
194         // using 0x3f (63) or compute bitIndex % 64.
195         // Note: If the index is negative the shift will be (64 - (bitIndex & 0x3f)) and
196         // this will identify an incorrect bit.
197         return 1L << bitIndex;
198     }
199 
200     // Compressed cardinality methods
201 
202     /**
203      * Returns the number of bits set to {@code true} in this {@code IndexSet} using a
204      * compression of 2 to 1. This counts as enabled <em>all</em> bits of each consecutive
205      * 2 bits if <em>any</em> of the consecutive 2 bits are set to {@code true}.
206      * <pre>
207      * 0010100011000101000100
208      * 0 2 2 0 2 0 2 2 0 2 0
209      * </pre>
210      * <p>This method can be used to assess the saturation of the indices in the range.
211      *
212      * @return the number of bits set to {@code true} in this {@code IndexSet} using a
213      * compression of 2 to 1
214      */
215     public int cardinality2() {
216         int c = 0;
217         for (long x : data) {
218             // Shift and mask out the bits that were shifted
219             x = (x | (x >>> 1)) & 0b0101010101010101010101010101010101010101010101010101010101010101L;
220             // Add [0, 32]
221             c += Long.bitCount(x);
222         }
223         // Multiply by 2
224         return c << 1;
225     }
226 
227     /**
228      * Returns the number of bits set to {@code true} in this {@code IndexSet} using a
229      * compression of 4 to 1. This counts as enabled <em>all</em> bits of each consecutive
230      * 4 bits if <em>any</em> of the consecutive 4 bits are set to {@code true}.
231      * <pre>
232      * 0010000011000101000100
233      * 4   0   4   4   4   0
234      * </pre>
235      * <p>This method can be used to assess the saturation of the indices in the range.
236      *
237      * @return the number of bits set to {@code true} in this {@code IndexSet} using a compression
238      * of 8 to 1
239      */
240     public int cardinality4() {
241         int c = 0;
242         for (long x : data) {
243             // Shift powers of 2 and mask out the bits that were shifted
244             x = x | (x >>> 1);
245             x = (x | (x >>> 2)) & 0b0001000100010001000100010001000100010001000100010001000100010001L;
246             // Expect a population count intrinsic method
247             // Add [0, 16]
248             c += Long.bitCount(x);
249         }
250         // Multiply by 4
251         return c << 2;
252     }
253 
254     /**
255      * Returns the number of bits set to {@code true} in this {@code IndexSet} using a
256      * compression of 8 to 1. This counts as enabled <em>all</em> bits of each consecutive
257      * 8 bits if <em>any</em> of the consecutive 8 bits are set to {@code true}.
258      * <pre>
259      * 0010000011000101000000
260      * 8       8       0
261      * </pre>
262      * <p>This method can be used to assess the saturation of the indices in the range.
263      *
264      * @return the number of bits set to {@code true} in this {@code IndexSet} using a compression
265      * of 8 to 1
266      */
267     public int cardinality8() {
268         int c = 0;
269         for (long x : data) {
270             // Shift powers of 2 and mask out the bits that were shifted
271             x = x | (x >>> 1);
272             x = x | (x >>> 2);
273             x = (x | (x >>> 4)) & 0b0000000100000001000000010000000100000001000000010000000100000001L;
274             // Expect a population count intrinsic method
275             // Add [0, 8]
276             c += Long.bitCount(x);
277         }
278         // Multiply by 8
279         return c << 3;
280     }
281 
282     /**
283      * Returns the number of bits set to {@code true} in this {@code IndexSet} using a
284      * compression of 16 to 1. This counts as enabled <em>all</em> bits of each consecutive
285      * 16 bits if <em>any</em> of the consecutive 16 bits are set to {@code true}.
286      * <pre>
287      * 0010000011000101000000
288      * 16              0
289      * </pre>
290      * <p>This method can be used to assess the saturation of the indices in the range.
291      *
292      * @return the number of bits set to {@code true} in this {@code IndexSet} using a compression
293      * of 16 to 1
294      */
295     public int cardinality16() {
296         int c = 0;
297         for (long x : data) {
298             // Shift powers of 2 and mask out the bits that were shifted
299             x = x | (x >>> 1);
300             x = x | (x >>> 2);
301             x = x | (x >>> 4);
302             x = (x | (x >>> 8)) & 0b0000000000000001000000000000000100000000000000010000000000000001L;
303             // Count the bits using folding
304             // if x = mask:
305             // (x += (x >>> 16)) : 0000000000000001000000000000001000000000000000100000000000000010
306             // (x += (x >>> 32)) : 0000000100000001000000100000001000000011000000110000010000000100
307             x = x + (x >>> 16); // put count of each 32 bits into their lowest 2 bits
308             x = x + (x >>> 32); // put count of each 64 bits into their lowest 3 bits
309             // Add [0, 4]
310             c += (int) x & 0b111;
311         }
312         // Multiply by 16
313         return c << 4;
314     }
315 
316     /**
317      * Returns the number of bits set to {@code true} in this {@code IndexSet} using a
318      * compression of 32 to 1. This counts as enabled <em>all</em> bits of each consecutive
319      * 32 bits if <em>any</em> of the consecutive 32 bits are set to {@code true}.
320      * <pre>
321      * 0010000011000101000000
322      * 32
323      * </pre>
324      * <p>This method can be used to assess the saturation of the indices in the range.
325      *
326      * @return the number of bits set to {@code true} in this {@code IndexSet} using a compression
327      * of 32 to 1
328      */
329     public int cardinality32() {
330         int c = 0;
331         for (final long x : data) {
332             // Are any lower 32-bits or upper 32-bits set?
333             c += (int) x != 0 ? 1 : 0;
334             c += (x >>> 32) != 0L ? 1 : 0;
335         }
336         // Multiply by 32
337         return c << 5;
338     }
339 
340     /**
341      * Returns the number of bits set to {@code true} in this {@code IndexSet} using a
342      * compression of 64 to 1. This counts as enabled <em>all</em> bits of each consecutive
343      * 64 bits if <em>any</em> of the consecutive 64 bits are set to {@code true}.
344      * <pre>
345      * 0010000011000101000000
346      * 64
347      * </pre>
348      * <p>This method can be used to assess the saturation of the indices in the range.
349      *
350      * @return the number of bits set to {@code true} in this {@code IndexSet} using a compression
351      * of 64 to 1
352      */
353     public int cardinality64() {
354         int c = 0;
355         for (final long x : data) {
356             // Are any bits set?
357             c += x != 0L ? 1 : 0;
358         }
359         // Multiply by 64
360         return c << 6;
361     }
362 
363     // Adapt method API from BitSet
364 
365     /**
366      * Returns the number of bits set to {@code true} in this {@code IndexSet}.
367      *
368      * @return the number of bits set to {@code true} in this {@code IndexSet}
369      */
370     public int cardinality() {
371         int c = 0;
372         for (final long x : data) {
373             c += Long.bitCount(x);
374         }
375         return c;
376     }
377 
378     /**
379      * Returns the value of the bit with the specified index.
380      *
381      * @param bitIndex the bit index (assumed to be positive)
382      * @return the value of the bit with the specified index
383      */
384     public boolean get(int bitIndex) {
385         // WARNING: No range checks !!!
386         final int index = bitIndex - left;
387         final int i = getLongIndex(index);
388         final long m = getLongBit(index);
389         return (data[i] & m) != 0;
390     }
391 
392     /**
393      * Sets the bit at the specified index to {@code true}.
394      *
395      * <p>Warning: This has no range checks. Use {@link #add(int)} to add an index that
396      * may be outside the support.
397      *
398      * @param bitIndex the bit index (assumed to be positive)
399      */
400     public void set(int bitIndex) {
401         // WARNING: No range checks !!!
402         final int index = bitIndex - left;
403         final int i = getLongIndex(index);
404         final long m = getLongBit(index);
405         data[i] |= m;
406     }
407 
408     /**
409      * Sets the bits from the specified {@code leftIndex} (inclusive) to the specified
410      * {@code rightIndex} (inclusive) to {@code true}.
411      *
412      * <p><em>If {@code rightIndex - leftIndex < 0} the behavior is not defined.</em></p>
413      *
414      * <p>Note: In contrast to the BitSet API, this uses an <em>inclusive</em> end as this
415      * is the main use case for the class.
416      *
417      * <p>Warning: This has no range checks. Use {@link #add(int, int)} to range that
418      * may be outside the support.
419      *
420      * @param leftIndex the left index
421      * @param rightIndex the right index
422      */
423     public void set(int leftIndex, int rightIndex) {
424         final int l = leftIndex - left;
425         final int r = rightIndex - left;
426 
427         // WARNING: No range checks !!!
428         int i = getLongIndex(l);
429         final int j = getLongIndex(r);
430 
431         // Fill in bits using (big-endian mask):
432         // end      middle   start
433         // 00011111 11111111 11111100
434 
435         // start = -1L << (left % 64)
436         // end = -1L >>> (64 - ((right+1) % 64))
437         final long start = LONG_MASK << l;
438         final long end = LONG_MASK >>> -(r + 1);
439         if (i == j) {
440             // Special case where the two masks overlap at the same long index
441             // 11111100 & 00011111 => 00011100
442             data[i] |= start & end;
443         } else {
444             // 11111100
445             data[i] |= start;
446             while (++i < j) {
447                 // 11111111
448                 // Note: -1L is all bits set
449                 data[i] = -1L;
450             }
451             // 00011111
452             data[j] |= end;
453         }
454     }
455 
456     /**
457      * Returns the index of the nearest bit that is set to {@code true} that occurs on or
458      * before the specified starting index. If no such bit exists, then {@code -1} is returned.
459      *
460      * @param fromIndex Index to start checking from (inclusive).
461      * @return the index of the previous set bit, or {@code -1} if there is no such bit
462      */
463     public int previousSetBit(int fromIndex) {
464         return previousSetBitOrElse(fromIndex, -1);
465     }
466 
467     /**
468      * Returns the index of the nearest bit that is set to {@code true} that occurs on or
469      * before the specified starting index. If no such bit exists, then
470      * {@code defaultValue} is returned.
471      *
472      * @param fromIndex Index to start checking from (inclusive).
473      * @param defaultValue Default value.
474      * @return the index of the previous set bit, or {@code defaultValue} if there is no such bit
475      */
476     int previousSetBitOrElse(int fromIndex, int defaultValue) {
477         if (fromIndex < left) {
478             // index is in an unknown range
479             return defaultValue;
480         }
481         final int index = fromIndex - left;
482         int i = getLongIndex(index);
483 
484         long bits = data[i];
485 
486         // Repeat logic of get(int) to check the bit
487         if ((bits & getLongBit(index)) != 0) {
488             return fromIndex;
489         }
490 
491         // Mask bits before the bit index
492         // mask = 00011111 = -1L >>> (64 - ((index + 1) % 64))
493         bits &= LONG_MASK >>> -(index + 1);
494         for (;;) {
495             if (bits != 0) {
496                 //(i+1)       i
497                 // |  index   |
498                 // |    |     |
499                 // 0  001010000
500                 return (i + 1) * Long.SIZE - Long.numberOfLeadingZeros(bits) - 1 + left;
501             }
502             if (i == 0) {
503                 return defaultValue;
504             }
505             bits = data[--i];
506         }
507     }
508 
509     /**
510      * Returns the index of the first bit that is set to {@code true} that occurs on or
511      * after the specified starting index. If no such bit exists then {@code -1} is
512      * returned.
513      *
514      * @param fromIndex Index to start checking from (inclusive).
515      * @return the index of the next set bit, or {@code -1} if there is no such bit
516      */
517     public int nextSetBit(int fromIndex) {
518         return nextSetBitOrElse(fromIndex, -1);
519     }
520 
521     /**
522      * Returns the index of the first bit that is set to {@code true} that occurs on or
523      * after the specified starting index. If no such bit exists then {@code defaultValue} is
524      * returned.
525      *
526      * @param fromIndex Index to start checking from (inclusive).
527      * @param defaultValue Default value.
528      * @return the index of the next set bit, or {@code defaultValue} if there is no such bit
529      */
530     int nextSetBitOrElse(int fromIndex, int defaultValue) {
531         // Support searching forward through the known range
532         final int index = fromIndex < left ? 0 : fromIndex - left;
533 
534         int i = getLongIndex(index);
535 
536         // Mask bits after the bit index
537         // mask = 11111000 = -1L << (index % 64)
538         long bits = data[i] & (LONG_MASK << index);
539         for (;;) {
540             if (bits != 0) {
541                 //(i+1)       i
542                 // |    index |
543                 // |      |   |
544                 // 0  001010000
545                 return i * Long.SIZE + Long.numberOfTrailingZeros(bits) + left;
546             }
547             if (++i == data.length) {
548                 return defaultValue;
549             }
550             bits = data[i];
551         }
552     }
553 
554     /**
555      * Returns the index of the first bit that is set to {@code false} that occurs on or
556      * after the specified starting index <em>within the supported range</em>. If no such
557      * bit exists then the {@code capacity} is returned where {@code capacity = index + 1}
558      * with {@code index} the largest index that can be added to the set without an error.
559      *
560      * <p>If the starting index is less than the supported range the result is {@code fromIndex}.
561      *
562      * @param fromIndex Index to start checking from (inclusive).
563      * @return the index of the next unset bit, or the {@code capacity} if there is no such bit
564      */
565     public int nextClearBit(int fromIndex) {
566         if (fromIndex < left) {
567             return fromIndex;
568         }
569         // Support searching forward through the known range
570         final int index = fromIndex - left;
571 
572         int i = getLongIndex(index);
573 
574         // Note: This method is conceptually the same as nextSetBit with the exception
575         // that: all the data is bit-flipped; the capacity is returned when the
576         // scan reaches the end.
577 
578         // Mask bits after the bit index
579         // mask = 11111000 = -1L << (fromIndex % 64)
580         long bits = ~data[i] & (LONG_MASK << index);
581         for (;;) {
582             if (bits != 0) {
583                 return i * Long.SIZE + Long.numberOfTrailingZeros(bits) + left;
584             }
585             if (++i == data.length) {
586                 // Capacity
587                 return data.length * Long.SIZE + left;
588             }
589             bits = ~data[i];
590         }
591     }
592 
593     /**
594      * Returns the index of the first bit that is set to {@code false} that occurs on or
595      * before the specified starting index <em>within the supported range</em>. If no such
596      * bit exists then {@code -1} is returned.
597      *
598      * <p>If the starting index is less than the supported range the result is {@code fromIndex}.
599      * This can return {@code -1} only if the support begins at {@code index == 0}.
600      *
601      * @param fromIndex Index to start checking from (inclusive).
602      * @return the index of the previous unset bit, or {@code -1} if there is no such bit
603      */
604     public int previousClearBit(int fromIndex) {
605         if (fromIndex < left) {
606             // index is in an unknown range
607             return fromIndex;
608         }
609         final int index = fromIndex - left;
610         int i = getLongIndex(index);
611 
612         // Note: This method is conceptually the same as previousSetBit with the exception
613         // that: all the data is bit-flipped; the offset - 1 is returned when the
614         // scan reaches the end.
615 
616         // Mask bits before the bit index
617         // mask = 00011111 = -1L >>> (64 - ((index + 1) % 64))
618         long bits = ~data[i] & (LONG_MASK >>> -(index + 1));
619         for (;;) {
620             if (bits != 0) {
621                 //(i+1)       i
622                 // |  index   |
623                 // |    |     |
624                 // 0  001010000
625                 return (i + 1) * Long.SIZE - Long.numberOfLeadingZeros(bits) - 1 + left;
626             }
627             if (i == 0) {
628                 return left - 1;
629             }
630             bits = ~data[--i];
631         }
632     }
633 
634     /**
635      * Perform the {@code action} for each index in the set.
636      *
637      * @param action Action.
638      */
639     public void forEach(IntConsumer action) {
640         // Adapted from o.a.c.collections4.IndexProducer
641         int wordIdx = left;
642         for (int i = 0; i < data.length; i++) {
643             long bits = data[i];
644             int index = wordIdx;
645             while (bits != 0) {
646                 if ((bits & 1L) == 1L) {
647                     action.accept(index);
648                 }
649                 bits >>>= 1;
650                 index++;
651             }
652             wordIdx += Long.SIZE;
653         }
654     }
655 
656     /**
657      * Write each index in the set into the provided array.
658      * Returns the number of indices.
659      *
660      * <p>The caller must ensure the output array has sufficient capacity.
661      * For example the array used to construct the IndexSet.
662      *
663      * @param a Output array.
664      * @return count of indices
665      * @see #of(int[])
666      * @see #of(int[], int)
667      */
668     public int toArray(int[] a) {
669         // This benchmarks as faster for index sorting than toArray2 even at
670         // high density (average separation of 2).
671         int n = -1;
672         int offset = left;
673         for (long bits : data) {
674             while (bits != 0) {
675                 final int index = Long.numberOfTrailingZeros(bits);
676                 a[++n] = index + offset;
677                 bits &= ~(1L << index);
678             }
679             offset += Long.SIZE;
680         }
681         return n + 1;
682     }
683 
684     /**
685      * Write each index in the set into the provided array.
686      * Returns the number of indices.
687      *
688      * <p>The caller must ensure the output array has sufficient capacity.
689      * For example the array used to construct the IndexSet.
690      *
691      * @param a Output array.
692      * @return count of indices
693      * @see #of(int[])
694      * @see #of(int[], int)
695      */
696     public int toArray2(int[] a) {
697         // Adapted from o.a.c.collections4.IndexProducer
698         int n = -1;
699         for (int i = 0, offset = left; i < data.length; i++, offset += Long.SIZE) {
700             long bits = data[i];
701             int index = offset;
702             while (bits != 0) {
703                 if ((bits & 1L) == 1L) {
704                     a[++n] = index;
705                 }
706                 bits >>>= 1;
707                 index++;
708             }
709         }
710         return n + 1;
711     }
712 
713     // PivotCache interface
714 
715     @Override
716     public int left() {
717         return left;
718     }
719 
720     @Override
721     public int right() {
722         return right;
723     }
724 
725     @Override
726     public boolean sparse() {
727         // Can store all pivots between [left, right]
728         return false;
729     }
730 
731     @Override
732     public boolean contains(int k) {
733         // Assume [left <= k <= right]
734         return get(k);
735     }
736 
737     @Override
738     public void add(int index) {
739         // Update the floating pivots if outside the support
740         if (index < left) {
741             lowerPivot = Math.max(index, lowerPivot);
742         } else if (index > right) {
743             upperPivot = Math.min(index, upperPivot);
744         } else {
745             set(index);
746         }
747     }
748 
749     @Override
750     public void add(int fromIndex, int toIndex) {
751         // Optimisation for the main use case of the PivotCache
752         if (fromIndex == toIndex) {
753             add(fromIndex);
754             return;
755         }
756 
757         // Note:
758         // Storing all pivots allows regions of identical values
759         // and sorted regions to be skipped in subsequent partitioning.
760         // Repeat sorting these regions is typically more expensive
761         // than caching them and moving over them during partitioning.
762         // An alternative is to: store fromIndex and only store
763         // toIndex if they are well separated, optionally storing
764         // regions between. If they are not well separated (e.g. < 10)
765         // then using a single pivot is an alternative to investigate
766         // with performance benchmarks on a range of input data.
767 
768         // Pivots are required to bracket [L, R]:
769         // LP-----L--------------R------UP
770         // If the range [i, j] overlaps either L or R then
771         // the floating pivots are no longer required:
772         //     i-j                             Set lower pivot
773         //     i--------j                      Ignore lower pivot
774         //     i---------------------j         Ignore lower & upper pivots (no longer required)
775         //           i-------j                 Ignore lower & upper pivots
776         //           i---------------j         Ignore upper pivot
777         //                         i-j         Set upper pivot
778         if (fromIndex <= right && toIndex >= left) {
779             // Clip the range between [left, right]
780             final int i = Math.max(fromIndex, left);
781             final int j = Math.min(toIndex, right);
782             set(i, j);
783         } else if (toIndex < left) {
784             lowerPivot = Math.max(toIndex, lowerPivot);
785         } else {
786             // fromIndex > right
787             upperPivot = Math.min(fromIndex, upperPivot);
788         }
789     }
790 
791     @Override
792     public int previousPivot(int k) {
793         // Assume scanning in [left <= k <= right]
794         return previousSetBitOrElse(k, lowerPivot);
795     }
796 
797     @Override
798     public int nextPivotOrElse(int k, int other) {
799         // Assume scanning in [left <= k <= right]
800         final int p = upperPivot == UPPER_DEFAULT ? other : upperPivot;
801         return nextSetBitOrElse(k, p);
802     }
803 
804     // IndexInterval
805 
806     @Override
807     public int previousIndex(int k) {
808         // Re-implement previousSetBitOrElse without index checks
809         // as this supports left <= k <= right
810 
811         final int index = k - left;
812         int i = getLongIndex(index);
813 
814         // Mask bits before the bit index
815         // mask = 00011111 = -1L >>> (64 - ((index + 1) % 64))
816         long bits = data[i] & (LONG_MASK >>> -(index + 1));
817         for (;;) {
818             if (bits != 0) {
819                 //(i+1)       i
820                 // |  index   |
821                 // |    |     |
822                 // 0  001010000
823                 return (i + 1) * Long.SIZE - Long.numberOfLeadingZeros(bits) - 1 + left;
824             }
825             // Unsupported: the interval should contain k
826             //if (i == 0) {
827             //    return left - 1;
828             //}
829             bits = data[--i];
830         }
831     }
832 
833     @Override
834     public int nextIndex(int k) {
835         // Re-implement nextSetBitOrElse without index checks
836         // as this supports left <= k <= right
837 
838         final int index = k - left;
839         int i = getLongIndex(index);
840 
841         // Mask bits after the bit index
842         // mask = 11111000 = -1L << (index % 64)
843         long bits = data[i] & (LONG_MASK << index);
844         for (;;) {
845             if (bits != 0) {
846                 //(i+1)       i
847                 // |    index |
848                 // |      |   |
849                 // 0  001010000
850                 return i * Long.SIZE + Long.numberOfTrailingZeros(bits) + left;
851             }
852             // Unsupported: the interval should contain k
853             //if (++i == data.length) {
854             //    return right + 1;
855             //}
856             bits = data[++i];
857         }
858     }
859 
860     // No override for split.
861     // This requires searching for previousIndex(k - 1) and nextIndex(k + 1).
862     // The only shared code is getLongIndex(x - left). Since argument indices are 2 apart
863     // these will map to a different long with a probability of 1/32.
864 
865     // IndexInterval2
866     // This is exactly the same as IndexInterval as the pointers i are the same as the keys k
867 
868     @Override
869     public int start() {
870         return left();
871     }
872 
873     @Override
874     public int end() {
875         return right();
876     }
877 
878     @Override
879     public int index(int i) {
880         return i;
881     }
882 
883     @Override
884     public int previous(int i, int k) {
885         return previousIndex(k);
886     }
887 
888     @Override
889     public int next(int i, int k) {
890         return nextIndex(k);
891     }
892 
893     /**
894      * Return a {@link ScanningPivotCache} implementation re-using the same internal storage.
895      *
896      * <p>Note that the range for the {@link ScanningPivotCache} must fit inside the current
897      * supported range of indices.
898      *
899      * <p>Warning: This operation clears all set bits within the range.
900      *
901      * <p><strong>Support</strong>
902      *
903      * <p>The returned {@link ScanningPivotCache} is suitable for storing all pivot points between
904      * {@code [left, right]} and the closest bounding pivots outside that range. It can be
905      * used for bracketing partition keys processed in a random order by storing pivots
906      * found during each successive partition search.
907      *
908      * <p>The returned {@link ScanningPivotCache} is suitable for use when iterating over
909      * partition keys in ascending order.
910      *
911      * <p>The cache allows incrementing the {@code left} support using
912      * {@link ScanningPivotCache#moveLeft(int)}. Any calls to decrement the {@code left} support
913      * at any time will result in an {@link UnsupportedOperationException}; this prevents reseting
914      * to within the original support used to create the cache. If the {@code left} is
915      * moved beyond the {@code right} then the move is rejected.
916      *
917      * @param lower Lower bound (inclusive).
918      * @param upper Upper bound (inclusive).
919      * @return the pivot cache
920      * @throws IllegalArgumentException if {@code right < left} or the range cannot be
921      * supported.
922      */
923     ScanningPivotCache asScanningPivotCache(int lower, int upper) {
924         return asScanningPivotCache(lower, upper, true);
925     }
926 
927     /**
928      * Return a {@link ScanningPivotCache} implementation to support the range
929      * {@code [left, right]}.
930      *
931      * <p>See {@link #asScanningPivotCache(int, int)} for the details of the cache implementation.
932      *
933      * @param left Lower bound (inclusive).
934      * @param right Upper bound (inclusive).
935      * @return the pivot cache
936      * @throws IllegalArgumentException if {@code right < left}
937      */
938     static ScanningPivotCache createScanningPivotCache(int left, int right) {
939         final IndexSet set = ofRange(left, right);
940         return set.asScanningPivotCache(left, right, false);
941     }
942 
943     /**
944      * Return a {@link ScanningPivotCache} implementation to support the range
945      * {@code [left, right]} re-using the same internal storage.
946      *
947      * @param lower Lower bound (inclusive).
948      * @param upper Upper bound (inclusive).
949      * @param initialize Perform validation checks and initialize the storage.
950      * @return the pivot cache
951      * @throws IllegalArgumentException if {@code right < left} or the range cannot be
952      * supported.
953      */
954     private ScanningPivotCache asScanningPivotCache(int lower, int upper, boolean initialize) {
955         if (initialize) {
956             checkRange(lower, upper);
957             final int capacity = data.length * Long.SIZE + lower;
958             if (lower < left || upper >= capacity) {
959                 throw new IllegalArgumentException(
960                     String.format("Unsupported range: [%d, %d] is not within [%d, %d]", lower, upper,
961                         left, capacity - 1));
962             }
963             // Clear existing data
964             Arrays.fill(data, 0);
965         }
966         return new IndexPivotCache(lower, upper);
967     }
968 
969     /**
970      * Return an {@link UpdatingInterval} implementation to support the range
971      * {@code [left, right]} re-using the same internal storage.
972      *
973      * @return the interval
974      */
975     UpdatingInterval interval() {
976         return new IndexSetUpdatingInterval(left, right);
977     }
978 
979     /**
980      * Check the range is valid.
981      *
982      * @param left Lower bound (inclusive).
983      * @param right Upper bound (inclusive).
984      * @throws IllegalArgumentException if {@code right < left}
985      */
986     private static void checkRange(int left, int right) {
987         if (right < left) {
988             throw new IllegalArgumentException(
989                 String.format("Invalid range: [%d, %d]", left, right));
990         }
991     }
992 
993     /**
994      * Implementation of the {@link ScanningPivotCache} using the {@link IndexSet}.
995      *
996      * <p>Stores all pivots between the support {@code [left, right]}. Uses two
997      * floating pivots which are the closest known pivots surrounding this range.
998      *
999      * <p>This class is bound to the enclosing {@link IndexSet} instance to provide
1000      * the functionality to read, write and search indexes.
1001      *
1002      * <p>Note: This duplicates functionality of the parent IndexSet. Differences
1003      * are that it uses a movable left bound and implements the scanning functionality
1004      * of the {@link ScanningPivotCache} interface. It can also be created for
1005      * a smaller {@code [left, right]} range than the enclosing class.
1006      *
1007      * <p>Creation of this class typically invalidates the use of the outer class.
1008      * Creation will zero the underlying storage and the range may be different.
1009      */
1010     private class IndexPivotCache implements ScanningPivotCache {
1011         /** Left bound of the support. */
1012         private int left;
1013         /** Right bound of the support. */
1014         private final int right;
1015         /** The upstream pivot closest to the left bound of the support.
1016          * Provides a lower search bound for the range [left, right]. */
1017         private int lowerPivot = -1;
1018         /** The downstream pivot closest to the right bound of the support.
1019          * Provides an upper search bound for the range [left, right]. */
1020         private int upperPivot = UPPER_DEFAULT;
1021 
1022         /**
1023          * @param left Lower bound (inclusive).
1024          * @param right Upper bound (inclusive).
1025          */
1026         IndexPivotCache(int left, int right) {
1027             this.left = left;
1028             this.right = right;
1029         }
1030 
1031         @Override
1032         public int left() {
1033             return left;
1034         }
1035 
1036         @Override
1037         public int right() {
1038             return right;
1039         }
1040 
1041         @Override
1042         public boolean sparse() {
1043             // Can store all pivots between [left, right]
1044             return false;
1045         }
1046 
1047         @Override
1048         public boolean moveLeft(int newLeft) {
1049             if (newLeft > right) {
1050                 // Signal that this cache can no longer be used in that range
1051                 return false;
1052             }
1053             if (newLeft < left) {
1054                 throw new UnsupportedOperationException(
1055                     String.format("New left is outside current support: %d < %d", newLeft, left));
1056             }
1057             // Here [left <= newLeft <= right]
1058             // Move the upstream pivot
1059             lowerPivot = previousPivot(newLeft);
1060             left = newLeft;
1061             return true;
1062         }
1063 
1064         @Override
1065         public boolean contains(int k) {
1066             // Assume [left <= k <= right]
1067             return IndexSet.this.get(k);
1068         }
1069 
1070         @Override
1071         public int previousPivot(int k) {
1072             // Assume scanning in [left <= k <= right]
1073             // Here left is moveable and lower pivot holds the last pivot below it.
1074             // The cache will not store any bits below left so if it has moved
1075             // searching may find stale bits below the current lower pivot.
1076             // So we return the max of the found bit or the lower pivot.
1077             if (k < left) {
1078                 return lowerPivot;
1079             }
1080             return Math.max(lowerPivot, IndexSet.this.previousSetBitOrElse(k, lowerPivot));
1081         }
1082 
1083         @Override
1084         public int nextPivotOrElse(int k, int other) {
1085             // Assume scanning in [left <= k <= right]
1086             final int p = upperPivot == UPPER_DEFAULT ? other : upperPivot;
1087             return IndexSet.this.nextSetBitOrElse(k, p);
1088         }
1089 
1090         @Override
1091         public int nextNonPivot(int k) {
1092             // Assume scanning in [left <= k <= right]
1093             return IndexSet.this.nextClearBit(k);
1094         }
1095 
1096         @Override
1097         public int previousNonPivot(int k) {
1098             // Assume scanning in [left <= k <= right]
1099             return IndexSet.this.previousClearBit(k);
1100         }
1101 
1102         @Override
1103         public void add(int index) {
1104             // Update the floating pivots if outside the support
1105             if (index < left) {
1106                 lowerPivot = Math.max(index, lowerPivot);
1107             } else if (index > right) {
1108                 upperPivot = Math.min(index, upperPivot);
1109             } else {
1110                 IndexSet.this.set(index);
1111             }
1112         }
1113 
1114         @Override
1115         public void add(int fromIndex, int toIndex) {
1116             if (fromIndex == toIndex) {
1117                 add(fromIndex);
1118                 return;
1119             }
1120             // Note:
1121             // Storing all pivots allows regions of identical values
1122             // and sorted regions to be skipped in subsequent partitioning.
1123             // Repeat sorting these regions is typically more expensive
1124             // than caching them and moving over them during partitioning.
1125             // An alternative is to: store fromIndex and only store
1126             // toIndex if they are well separated, optionally storing
1127             // regions between. If they are not well separated (e.g. < 10)
1128             // then using a single pivot is an alternative to investigate
1129             // with performance benchmarks on a range of input data.
1130 
1131             // Pivots are required to bracket [L, R]:
1132             // LP-----L--------------R------UP
1133             // If the range [i, j] overlaps either L or R then
1134             // the floating pivots are no longer required:
1135             //     i-j                             Set lower pivot
1136             //     i--------j                      Ignore lower pivot
1137             //     i---------------------j         Ignore lower & upper pivots (no longer required)
1138             //           i-------j                 Ignore lower & upper pivots
1139             //           i---------------j         Ignore upper pivot
1140             //                         i-j         Set upper pivot
1141             if (fromIndex <= right && toIndex >= left) {
1142                 // Clip the range between [left, right]
1143                 final int i = Math.max(fromIndex, left);
1144                 final int j = Math.min(toIndex, right);
1145                 IndexSet.this.set(i, j);
1146             } else if (toIndex < left) {
1147                 lowerPivot = Math.max(toIndex, lowerPivot);
1148             } else {
1149                 // fromIndex > right
1150                 upperPivot = Math.min(fromIndex, upperPivot);
1151             }
1152         }
1153     }
1154 
1155     /**
1156      * Implementation of the {@link UpdatingInterval} using the {@link IndexSet}.
1157      *
1158      * <p>This class is bound to the enclosing {@link IndexSet} instance to provide
1159      * the functionality to search indexes.
1160      */
1161     private class IndexSetUpdatingInterval implements UpdatingInterval {
1162         /** Left bound of the interval. */
1163         private int left;
1164         /** Right bound of the interval. */
1165         private int right;
1166 
1167         /**
1168          * @param left Lower bound (inclusive).
1169          * @param right Upper bound (inclusive).
1170          */
1171         IndexSetUpdatingInterval(int left, int right) {
1172             this.left = left;
1173             this.right = right;
1174         }
1175 
1176         @Override
1177         public int left() {
1178             return left;
1179         }
1180 
1181         @Override
1182         public int right() {
1183             return right;
1184         }
1185 
1186         @Override
1187         public int updateLeft(int k) {
1188             // Assume left < k= < right
1189             return left = nextIndex(k);
1190         }
1191 
1192         @Override
1193         public int updateRight(int k) {
1194             // Assume left <= k < right
1195             return right = previousIndex(k);
1196         }
1197 
1198         @Override
1199         public UpdatingInterval splitLeft(int ka, int kb) {
1200             // Assume left < ka <= kb < right
1201             final int lower = left;
1202             left = nextIndex(kb + 1);
1203             return new IndexSetUpdatingInterval(lower, previousIndex(ka - 1));
1204         }
1205 
1206         @Override
1207         public UpdatingInterval splitRight(int ka, int kb) {
1208             // Assume left < ka <= kb < right
1209             final int upper = right;
1210             right = previousIndex(ka - 1);
1211             return new IndexSetUpdatingInterval(nextIndex(kb + 1), upper);
1212         }
1213     }
1214 }