1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.commons.numbers.arrays;
19
20 /**
21 * Partition array data.
22 *
23 * <p>Note: Requires that the floating-point data contains no NaN values; sorting does not
24 * respect the order of signed zeros imposed by {@link Double#compare(double, double)};
25 * mixed signed zeros may be destroyed (the mixture updated during partitioning). The
26 * caller is responsible for counting a mixture of signed zeros and restoring them if
27 * required.
28 *
29 * @see Selection
30 * @since 1.2
31 */
32 final class QuickSelect {
33 // Implementation Notes
34 //
35 // Selection is performed using a quickselect variant to recursively divide the range
36 // to select the target index, or indices. Partition sizes or recursion are monitored
37 // will fall-backs on poor convergence of a linearselect (single index) or heapselect.
38 //
39 // Many implementations were tested, each with strengths and weaknesses on different
40 // input data containing random elements, repeat elements, elements with repeat
41 // patterns, and constant elements. The final implementation performs well across data
42 // types for single and multiple indices with no obvious weakness.
43 // See: o.a.c.numbers.examples.jmh.arrays for benchmarking implementations.
44 //
45 // Single indices are selected using a quickselect adaptive method based on Alexandrescu.
46 // The algorithm is a quickselect around a pivot identified using a
47 // sample-of-sample-of-samples created from the entire range data. This pivot will
48 // have known lower and upper margins and ensures elimination of a minimum fraction of
49 // data at each step. To increase speed the pivot can be identified using less of the data
50 // but without margin guarantees (sampling mode). The algorithm monitors partition sizes
51 // against the known margins. If the reduction in the partition size is not large enough
52 // then the algorithm can disable sampling mode and ensure linear performance by removing
53 // a set fraction of the data each iteration.
54 //
55 // Modifications from Alexandrescu are:
56 // 1. Initialise sampling mode using the Floyd-Rivest (FR) SELECT algorithm.
57 // 2. Adaption is adjusted to force use of the lower margin in the far-step method when
58 // sampling is disabled.
59 // 3. Change the far-step method to a min-of-4 then median-of-3 into the 2nd 12th-tile.
60 // The original method uses a lower-median-of-4, min-of-3 into the 4th 12th-tile.
61 // 4. Position the sample around the target k when in sampling mode for the non-far-step
62 // methods.
63 //
64 // The far step method is used when target k is within 1/12 of the end of the data A.
65 // The differences in the far-step method are:
66 // - The upper margin when not sampling is 8/24 vs. 9/24; the lower margin remains at 1/12.
67 // - The position of the sample is closer to the expected location of k < |A|/12.
68 // - Sampling mode uses a median-of-3 with adaptive k, matching the other step methods.
69 // Note the original min-of-3 sample is more likely to create a pivot too small if used
70 // with adaption leaving k in the larger partition and a wasted iteration.
71 //
72 // The Floyd-Rivest (FR) SELECT algorithm is preferred for sampling over using quickselect
73 // adaptive sampling. It uses a smaller sample and has improved heuristics to place the sample
74 // pivot. However the FR sample is a small range of the data and pivot selection can be poor
75 // if the sample is not representative. This can be mitigated by creating a random sample
76 // of the entire range for the pivot selection. This implementation does not use random
77 // sampling for the FR mode. Performance is identical on random data (randomisation is a
78 // negligible overhead) and faster on sorted data. Any data not suitable for the FR algorithm
79 // are immediately switched to the quickselect adaptive algorithm with sampling. Performance
80 // across a range of data shows this strategy is approximately mid-way in performance between
81 // FR with random sampling, and quickselect adaptive in sampling mode. The benefit is that
82 // sorted or partially partitioned data are not intentionally unordered as the method will
83 // only move elements known to be incorrectly placed in the array.
84 //
85 // Multiple indices are selected using a dual-pivot partition method by
86 // Yaroslavskiy to divide the interval containing the indices. Recursion is performed for
87 // regions containing target indices. The method is thus a partial quicksort into regions of
88 // interest. Excess recursion triggers use of a heapselect. When indices are effectively
89 // a single index the method can switch to the single index selection to use the FR algorithm.
90 //
91 // Alternative schemes to partition multiple indices are to repeat call single index select
92 // with cached pivots, or without cached pivots if processing indices in order as the previous
93 // index brackets the range for the next search. Caching pivots is the most effective
94 // alternative. It requires storing all pivots during select, and using the cache to look-up
95 // the search bounds (sub-range) for each target index. This requires 2n searches for n indices.
96 // All pivots must be stored to avoid destroying previously partitioned data on repeat entry
97 // to the array. The current scheme inverts this by requiring at most n-1 divides of the
98 // indices during recursion and has the advantage of tracking recursion depth during selection
99 // for each sub-range. Division of indices is a small overhead for the common case where
100 // the number of indices is far smaller than the size of the data.
101 //
102 // Dual-pivot partitioning adapted from Yaroslavskiy
103 // http://codeblab.com/wp-content/uploads/2009/09/DualPivotQuicksort.pdf
104 //
105 // Modified to allow partial sorting (partitioning):
106 // - Ignore insertion sort for tiny array (handled by calling code).
107 // - Ignore recursive calls for a full sort (handled by calling code).
108 // - Change to fast-forward over initial ascending / descending runs.
109 // - Change to fast-forward great when v > v2 and either break the sorting
110 // loop, or move a[great] direct to the correct location.
111 // - Change to use the 2nd and 4th of 5 elements for the pivots.
112 // - Identify a large central region using ~5/8 of the length to trigger search for
113 // equal values.
114 //
115 // For some indices and data a full sort of the data will be faster; this is impossible to
116 // predict on unknown data input and attempts to analyse the indices and data impact
117 // performance for the majority of use cases where sorting is not a suitable choice.
118 // Use of the sortselect finisher allows the current multiple indices method to degrade
119 // to a (non-optimised) dual-pivot quicksort (see below).
120 //
121 // heapselect vs sortselect
122 //
123 // Quickselect can switch to an alternative when: the range is very small
124 // (e.g. insertion sort); or the target index is close to the end (e.g. heapselect).
125 // Small ranges and a target index close to the end are handled using a hybrid of insertion
126 // sort and selection (sortselect). This is faster than heapselect for small distance from
127 // the edge (m) for a single index and has the advantage of sorting all upstream values from
128 // the target index (heapselect requires push-down of each successive value to sort). This
129 // allows the dual-pivot quickselect on multiple indices that saturate the range to degrade
130 // to a (non-optimised) dual-pivot quicksort. However sortselect is worst case Order(m * (r-l))
131 // for range [l, r] so cannot be used when quickselect fails to converge as m may be very large.
132 // Thus heapselect is used as the stopper algorithm when quickselect progress is slow on
133 // multiple indices. If heapselect is used for small range handling the performance on
134 // saturated indices is significantly slower. Hence the presence of two final selection
135 // methods for different purposes.
136
137 /** Sampling mode using Floyd-Rivest sampling. */
138 static final int MODE_FR_SAMPLING = -1;
139 /** Sampling mode. */
140 static final int MODE_SAMPLING = 0;
141 /** No sampling but use adaption of the target k. */
142 static final int MODE_ADAPTION = 1;
143 /** No sampling and no adaption of target k (strict margins). */
144 static final int MODE_STRICT = 2;
145
146 /** Minimum size for sortselect.
147 * Below this perform a sort rather than selection. This is used to avoid
148 * sort select on tiny data. */
149 private static final int MIN_SORTSELECT_SIZE = 4;
150 /** Single-pivot sortselect size for quickselect adaptive. Note that quickselect adaptive
151 * recursively calls quickselect so very small lengths are included with an initial medium
152 * length. Using lengths of 1023-5 and 2043-53 indicate optimum performance around 20-30.
153 * Note: The expand partition function assumes a sample of at least length 2 as each end
154 * of the sample is used as a sentinel; this imposes a minimum length of 24 on the range
155 * to ensure it contains a 12-th tile of length 2. Thus the absolute minimum for the
156 * distance from the edge is 12. */
157 private static final int LINEAR_SORTSELECT_SIZE = 24;
158 /** Dual-pivot sortselect size for the distance of a single k from the edge of the
159 * range length n. Benchmarking in range [81+81, 243+243] suggests a value of ~20 (or
160 * higher on some hardware). Ranges are chosen based on third interval spacing between
161 * powers of 3.
162 *
163 * <p>Sortselect is faster at this small size than heapselect. A second advantage is
164 * that all indices closer to the edge than the target index are also sorted. This
165 * allows selection of multiple close indices to be performed with effectively the
166 * same speed. High density indices will result in recursion to very short fragments
167 * which also trigger use of sort select. The threshold for sorting short lengths is
168 * configured in {@link #dualPivotSortSelectSize(int, int)}. */
169 private static final int DP_SORTSELECT_SIZE = 20;
170 /** Threshold to use Floyd-Rivest sub-sampling. This partitions a sample of the data to
171 * identify a pivot so that the target element is in the smaller set after partitioning.
172 * The original FR paper used 600 otherwise reverted to the target index as the pivot.
173 * This implementation reverts to quickselect adaptive which increases robustness
174 * at small size on a variety of data and allows raising the original FR threshold. */
175 private static final int FR_SAMPLING_SIZE = 1200;
176
177 /** Increment used for the recursion counter. The counter will overflow to negative when
178 * recursion has exceeded the maximum level. The counter is maintained in the upper bits
179 * of the dual-pivot control flags. */
180 private static final int RECURSION_INCREMENT = 1 << 20;
181 /** Mask to extract the sort select size from the dual-pivot control flags. Currently
182 * the bits below those used for the recursion counter are only used for the sort select size
183 * so this can use a mask with all bits below the increment. */
184 private static final int SORTSELECT_MASK = RECURSION_INCREMENT - 1;
185
186 /** Threshold to use repeated step left: 7 / 16. */
187 private static final double STEP_LEFT = 0.4375;
188 /** Threshold to use repeated step right: 9 / 16. */
189 private static final double STEP_RIGHT = 0.5625;
190 /** Threshold to use repeated step far-left: 1 / 12. */
191 private static final double STEP_FAR_LEFT = 0.08333333333333333;
192 /** Threshold to use repeated step far-right: 11 / 12. */
193 private static final double STEP_FAR_RIGHT = 0.9166666666666666;
194
195 /** No instances. */
196 private QuickSelect() {}
197
198 /**
199 * Partition the elements between {@code ka} and {@code kb} using a heap select
200 * algorithm. It is assumed {@code left <= ka <= kb <= right}.
201 *
202 * @param a Data array to use to find out the K<sup>th</sup> value.
203 * @param left Lower bound (inclusive).
204 * @param right Upper bound (inclusive).
205 * @param ka Lower index to select.
206 * @param kb Upper index to select.
207 */
208 static void heapSelect(double[] a, int left, int right, int ka, int kb) {
209 if (right <= left) {
210 return;
211 }
212 // Use the smallest heap
213 if (kb - left < right - ka) {
214 heapSelectLeft(a, left, right, ka, kb);
215 } else {
216 heapSelectRight(a, left, right, ka, kb);
217 }
218 }
219
220 /**
221 * Partition the elements between {@code ka} and {@code kb} using a heap select
222 * algorithm. It is assumed {@code left <= ka <= kb <= right}.
223 *
224 * <p>For best performance this should be called with {@code k} in the lower
225 * half of the range.
226 *
227 * @param a Data array to use to find out the K<sup>th</sup> value.
228 * @param left Lower bound (inclusive).
229 * @param right Upper bound (inclusive).
230 * @param ka Lower index to select.
231 * @param kb Upper index to select.
232 */
233 static void heapSelectLeft(double[] a, int left, int right, int ka, int kb) {
234 // Create a max heap in-place in [left, k], rooted at a[left] = max
235 // |l|-max-heap-|k|--------------|
236 // Build the heap using Floyd's heap-construction algorithm for heap size n.
237 // Start at parent of the last element in the heap (k),
238 // i.e. start = parent(n-1) : parent(c) = floor((c - 1) / 2) : c = k - left
239 int end = kb + 1;
240 for (int p = left + ((kb - left - 1) >> 1); p >= left; p--) {
241 maxHeapSiftDown(a, a[p], p, left, end);
242 }
243 // Scan the remaining data and insert
244 // Mitigate worst case performance on descending data by backward sweep
245 double max = a[left];
246 for (int i = right + 1; --i > kb;) {
247 final double v = a[i];
248 if (v < max) {
249 a[i] = max;
250 maxHeapSiftDown(a, v, left, left, end);
251 max = a[left];
252 }
253 }
254 // Partition [ka, kb]
255 // |l|-max-heap-|k|--------------|
256 // | <-swap-> | then sift down reduced size heap
257 // Avoid sifting heap of size 1
258 final int last = Math.max(left, ka - 1);
259 while (--end > last) {
260 maxHeapSiftDown(a, a[end], left, left, end);
261 a[end] = max;
262 max = a[left];
263 }
264 }
265
266 /**
267 * Sift the element down the max heap.
268 *
269 * <p>Assumes {@code root <= p < end}, i.e. the max heap is above root.
270 *
271 * @param a Heap data.
272 * @param v Value to sift.
273 * @param p Start position.
274 * @param root Root of the heap.
275 * @param end End of the heap (exclusive).
276 */
277 private static void maxHeapSiftDown(double[] a, double v, int p, int root, int end) {
278 // child2 = root + 2 * (parent - root) + 2
279 // = 2 * parent - root + 2
280 while (true) {
281 // Right child
282 int c = (p << 1) - root + 2;
283 if (c > end) {
284 // No left child
285 break;
286 }
287 // Use the left child if right doesn't exist, or it is greater
288 if (c == end || a[c] < a[c - 1]) {
289 --c;
290 }
291 if (v >= a[c]) {
292 // Parent greater than largest child - done
293 break;
294 }
295 // Swap and descend
296 a[p] = a[c];
297 p = c;
298 }
299 a[p] = v;
300 }
301
302 /**
303 * Partition the elements between {@code ka} and {@code kb} using a heap select
304 * algorithm. It is assumed {@code left <= ka <= kb <= right}.
305 *
306 * <p>For best performance this should be called with {@code k} in the upper
307 * half of the range.
308 *
309 * @param a Data array to use to find out the K<sup>th</sup> value.
310 * @param left Lower bound (inclusive).
311 * @param right Upper bound (inclusive).
312 * @param ka Lower index to select.
313 * @param kb Upper index to select.
314 */
315 static void heapSelectRight(double[] a, int left, int right, int ka, int kb) {
316 // Create a min heap in-place in [k, right], rooted at a[right] = min
317 // |--------------|k|-min-heap-|r|
318 // Build the heap using Floyd's heap-construction algorithm for heap size n.
319 // Start at parent of the last element in the heap (k),
320 // i.e. start = parent(n-1) : parent(c) = floor((c - 1) / 2) : c = right - k
321 int end = ka - 1;
322 for (int p = right - ((right - ka - 1) >> 1); p <= right; p++) {
323 minHeapSiftDown(a, a[p], p, right, end);
324 }
325 // Scan the remaining data and insert
326 // Mitigate worst case performance on descending data by backward sweep
327 double min = a[right];
328 for (int i = left - 1; ++i < ka;) {
329 final double v = a[i];
330 if (v > min) {
331 a[i] = min;
332 minHeapSiftDown(a, v, right, right, end);
333 min = a[right];
334 }
335 }
336 // Partition [ka, kb]
337 // |--------------|k|-min-heap-|r|
338 // | <-swap-> | then sift down reduced size heap
339 // Avoid sifting heap of size 1
340 final int last = Math.min(right, kb + 1);
341 while (++end < last) {
342 minHeapSiftDown(a, a[end], right, right, end);
343 a[end] = min;
344 min = a[right];
345 }
346 }
347
348 /**
349 * Sift the element down the min heap.
350 *
351 * <p>Assumes {@code root >= p > end}, i.e. the max heap is below root.
352 *
353 * @param a Heap data.
354 * @param v Value to sift.
355 * @param p Start position.
356 * @param root Root of the heap.
357 * @param end End of the heap (exclusive).
358 */
359 private static void minHeapSiftDown(double[] a, double v, int p, int root, int end) {
360 // child2 = root - 2 * (root - parent) - 2
361 // = 2 * parent - root - 2
362 while (true) {
363 // Right child
364 int c = (p << 1) - root - 2;
365 if (c < end) {
366 // No left child
367 break;
368 }
369 // Use the left child if right doesn't exist, or it is less
370 if (c == end || a[c] > a[c + 1]) {
371 ++c;
372 }
373 if (v <= a[c]) {
374 // Parent less than smallest child - done
375 break;
376 }
377 // Swap and descend
378 a[p] = a[c];
379 p = c;
380 }
381 a[p] = v;
382 }
383
384 /**
385 * Partition the elements between {@code ka} and {@code kb} using a sort select
386 * algorithm. It is assumed {@code left <= ka <= kb <= right}.
387 *
388 * @param a Data array to use to find out the K<sup>th</sup> value.
389 * @param left Lower bound (inclusive).
390 * @param right Upper bound (inclusive).
391 * @param ka Lower index to select.
392 * @param kb Upper index to select.
393 */
394 static void sortSelect(double[] a, int left, int right, int ka, int kb) {
395 // Combine the test for right <= left with
396 // avoiding the overhead of sort select on tiny data.
397 if (right - left <= MIN_SORTSELECT_SIZE) {
398 Sorting.sort(a, left, right);
399 return;
400 }
401 // Sort the smallest side
402 if (kb - left < right - ka) {
403 sortSelectLeft(a, left, right, kb);
404 } else {
405 sortSelectRight(a, left, right, ka);
406 }
407 }
408
409 /**
410 * Partition the minimum {@code n} elements below {@code k} where
411 * {@code n = k - left + 1}. Uses an insertion sort algorithm.
412 *
413 * <p>Works with any {@code k} in the range {@code left <= k <= right}
414 * and performs a full sort of the range below {@code k}.
415 *
416 * <p>For best performance this should be called with
417 * {@code k - left < right - k}, i.e.
418 * to partition a value in the lower half of the range.
419 *
420 * @param a Data array to use to find out the K<sup>th</sup> value.
421 * @param left Lower bound (inclusive).
422 * @param right Upper bound (inclusive).
423 * @param k Index to select.
424 */
425 static void sortSelectLeft(double[] a, int left, int right, int k) {
426 // Sort
427 for (int i = left; ++i <= k;) {
428 final double v = a[i];
429 // Move preceding higher elements above (if required)
430 if (v < a[i - 1]) {
431 int j = i;
432 while (--j >= left && v < a[j]) {
433 a[j + 1] = a[j];
434 }
435 a[j + 1] = v;
436 }
437 }
438 // Scan the remaining data and insert
439 // Mitigate worst case performance on descending data by backward sweep
440 double m = a[k];
441 for (int i = right + 1; --i > k;) {
442 final double v = a[i];
443 if (v < m) {
444 a[i] = m;
445 int j = k;
446 while (--j >= left && v < a[j]) {
447 a[j + 1] = a[j];
448 }
449 a[j + 1] = v;
450 m = a[k];
451 }
452 }
453 }
454
455 /**
456 * Partition the maximum {@code n} elements above {@code k} where
457 * {@code n = right - k + 1}. Uses an insertion sort algorithm.
458 *
459 * <p>Works with any {@code k} in the range {@code left <= k <= right}
460 * and can be used to perform a full sort of the range above {@code k}.
461 *
462 * <p>For best performance this should be called with
463 * {@code k - left > right - k}, i.e.
464 * to partition a value in the upper half of the range.
465 *
466 * @param a Data array to use to find out the K<sup>th</sup> value.
467 * @param left Lower bound (inclusive).
468 * @param right Upper bound (inclusive).
469 * @param k Index to select.
470 */
471 static void sortSelectRight(double[] a, int left, int right, int k) {
472 // Sort
473 for (int i = right; --i >= k;) {
474 final double v = a[i];
475 // Move succeeding lower elements below (if required)
476 if (v > a[i + 1]) {
477 int j = i;
478 while (++j <= right && v > a[j]) {
479 a[j - 1] = a[j];
480 }
481 a[j - 1] = v;
482 }
483 }
484 // Scan the remaining data and insert
485 // Mitigate worst case performance on descending data by backward sweep
486 double m = a[k];
487 for (int i = left - 1; ++i < k;) {
488 final double v = a[i];
489 if (v > m) {
490 a[i] = m;
491 int j = k;
492 while (++j <= right && v > a[j]) {
493 a[j - 1] = a[j];
494 }
495 a[j - 1] = v;
496 m = a[k];
497 }
498 }
499 }
500
501 /**
502 * Partition the array such that index {@code k} corresponds to its correctly
503 * sorted value in the equivalent fully sorted array.
504 *
505 * <p>Assumes {@code k} is a valid index into [left, right].
506 *
507 * @param a Values.
508 * @param left Lower bound of data (inclusive).
509 * @param right Upper bound of data (inclusive).
510 * @param k Index.
511 */
512 static void select(double[] a, int left, int right, int k) {
513 quickSelectAdaptive(a, left, right, k, k, new int[1], MODE_FR_SAMPLING);
514 }
515
516 /**
517 * Partition the array such that indices {@code k} correspond to their correctly
518 * sorted value in the equivalent fully sorted array.
519 *
520 * <p>The count of the number of used indices is returned. If the keys are sorted in-place,
521 * the count is returned as a negative.
522 *
523 * @param a Values.
524 * @param left Lower bound of data (inclusive).
525 * @param right Upper bound of data (inclusive).
526 * @param k Indices (may be destructively modified).
527 * @param n Count of indices.
528 * @return the count of used indices
529 * @throws IndexOutOfBoundsException if any index {@code k} is not within the
530 * sub-range {@code [left, right]}
531 */
532 static int select(double[] a, int left, int right, int[] k, int n) {
533 if (n < 1) {
534 return 0;
535 }
536 if (n == 1) {
537 quickSelectAdaptive(a, left, right, k[0], k[0], new int[1], MODE_FR_SAMPLING);
538 return -1;
539 }
540
541 // Interval creation validates the indices are in [left, right]
542 final UpdatingInterval keys = IndexSupport.createUpdatingInterval(left, right, k, n);
543
544 // Save number of used indices
545 final int count = IndexSupport.countIndices(keys, n);
546
547 // Note: If the keys are not separated then they are effectively a single key.
548 // Any split of keys separated by the sort select size
549 // will be finished on the next iteration.
550 final int k1 = keys.left();
551 final int kn = keys.right();
552 if (kn - k1 < DP_SORTSELECT_SIZE) {
553 quickSelectAdaptive(a, left, right, k1, kn, new int[1], MODE_FR_SAMPLING);
554 } else {
555 // Dual-pivot mode with small range sort length configured using index density
556 dualPivotQuickSelect(a, left, right, keys, dualPivotFlags(left, right, k1, kn));
557 }
558 return count;
559 }
560
561 /**
562 * Partition the array such that indices {@code k} correspond to their
563 * correctly sorted value in the equivalent fully sorted array.
564 *
565 * <p>For all indices {@code [ka, kb]} and any index {@code i}:
566 *
567 * <pre>{@code
568 * data[i < ka] <= data[ka] <= data[kb] <= data[kb < i]
569 * }</pre>
570 *
571 * <p>This function accepts indices {@code [ka, kb]} that define the
572 * range of indices to partition. It is expected that the range is small.
573 *
574 * <p>The {@code flags} are used to control the sampling mode and adaption of
575 * the index within the sample.
576 *
577 * <p>Returns the bounds containing {@code [ka, kb]}. These may be lower/higher
578 * than the keys if equal values are present in the data.
579 *
580 * @param a Values.
581 * @param left Lower bound of data (inclusive, assumed to be strictly positive).
582 * @param right Upper bound of data (inclusive, assumed to be strictly positive).
583 * @param ka First key of interest.
584 * @param kb Last key of interest.
585 * @param bounds Upper bound of the range containing {@code [ka, kb]} (inclusive).
586 * @param flags Adaption flags.
587 * @return Lower bound of the range containing {@code [ka, kb]} (inclusive).
588 */
589 static int quickSelectAdaptive(double[] a, int left, int right, int ka, int kb,
590 int[] bounds, int flags) {
591 int l = left;
592 int r = right;
593 int m = flags;
594 while (true) {
595 // Select when ka and kb are close to the same end
596 // |l|-----|ka|kkkkkkkk|kb|------|r|
597 if (Math.min(kb - l, r - ka) < LINEAR_SORTSELECT_SIZE) {
598 sortSelect(a, l, r, ka, kb);
599 bounds[0] = kb;
600 return ka;
601 }
602
603 // Only target ka; kb is assumed to be close
604 int p0;
605 final int n = r - l;
606 // f in [0, 1]
607 final double f = (double) (ka - l) / n;
608 // Record the larger margin (start at 1/4) to create the estimated size.
609 // step L R
610 // far left 1/12 1/3 (use 1/4 + 1/32 + 1/64 ~ 0.328)
611 // left 1/6 1/4
612 // middle 2/9 2/9 (use 1/4 - 1/32 ~ 0.219)
613 int margin = n >> 2;
614 if (m < MODE_SAMPLING && r - l > FR_SAMPLING_SIZE) {
615 // Floyd-Rivest sample step uses the same margins
616 p0 = sampleStep(a, l, r, ka, bounds);
617 if (f <= STEP_FAR_LEFT || f >= STEP_FAR_RIGHT) {
618 margin += (n >> 5) + (n >> 6);
619 } else if (f > STEP_LEFT && f < STEP_RIGHT) {
620 margin -= n >> 5;
621 }
622 } else if (f <= STEP_LEFT) {
623 if (f <= STEP_FAR_LEFT) {
624 margin += (n >> 5) + (n >> 6);
625 p0 = repeatedStepFarLeft(a, l, r, ka, bounds, m);
626 } else {
627 p0 = repeatedStepLeft(a, l, r, ka, bounds, m);
628 }
629 } else if (f >= STEP_RIGHT) {
630 if (f >= STEP_FAR_RIGHT) {
631 margin += (n >> 5) + (n >> 6);
632 p0 = repeatedStepFarRight(a, l, r, ka, bounds, m);
633 } else {
634 p0 = repeatedStepRight(a, l, r, ka, bounds, m);
635 }
636 } else {
637 margin -= n >> 5;
638 p0 = repeatedStep(a, l, r, ka, bounds, m);
639 }
640
641 // Note: Here we expect [ka, kb] to be small and splitting is unlikely.
642 // p0 p1
643 // |l|--|ka|kkkk|kb|--|P|-------------------|r|
644 // |l|----------------|P|--|ka|kkk|kb|------|r|
645 // |l|-----------|ka|k|P|k|kb|--------------|r|
646 final int p1 = bounds[0];
647 if (kb < p0) {
648 // Entirely on left side
649 r = p0 - 1;
650 } else if (ka > p1) {
651 // Entirely on right side
652 l = p1 + 1;
653 } else {
654 // Pivot splits [ka, kb]. Expect ends to be close to the pivot and finish.
655 // Here we set the bounds for use after median-of-medians pivot selection.
656 // In the event there are many equal values this allows collecting those
657 // known to be equal together when moving around the medians sample.
658 if (kb > p1) {
659 sortSelectLeft(a, p1 + 1, r, kb);
660 bounds[0] = kb;
661 }
662 if (ka < p0) {
663 sortSelectRight(a, l, p0 - 1, ka);
664 p0 = ka;
665 }
666 return p0;
667 }
668 // Update mode based on target partition size
669 if (r - l > n - margin) {
670 m++;
671 }
672 }
673 }
674
675 /**
676 * Partition an array slice around a pivot. Partitioning exchanges array elements such
677 * that all elements smaller than pivot are before it and all elements larger than
678 * pivot are after it.
679 *
680 * <p>Partitions a Floyd-Rivest sample around a pivot offset so that the input {@code k} will
681 * fall in the smaller partition when the entire range is partitioned.
682 *
683 * <p>Assumes the range {@code r - l} is large.
684 *
685 * @param a Data array.
686 * @param l Lower bound (inclusive).
687 * @param r Upper bound (inclusive).
688 * @param k Target index.
689 * @param upper Upper bound (inclusive) of the pivot range.
690 * @return Lower bound (inclusive) of the pivot range.
691 */
692 private static int sampleStep(double[] a, int l, int r, int k, int[] upper) {
693 // Floyd-Rivest: use SELECT recursively on a sample of size S to get an estimate
694 // for the (k-l+1)-th smallest element into a[k], biased slightly so that the
695 // (k-l+1)-th element is expected to lie in the smaller set after partitioning.
696 final int n = r - l + 1;
697 final int ith = k - l + 1;
698 final double z = Math.log(n);
699 // sample size = 0.5 * n^(2/3)
700 final double s = 0.5 * Math.exp(0.6666666666666666 * z);
701 final double sd = 0.5 * Math.sqrt(z * s * (n - s) / n) * Integer.signum(ith - (n >> 1));
702 final int ll = Math.max(l, (int) (k - ith * s / n + sd));
703 final int rr = Math.min(r, (int) (k + (n - ith) * s / n + sd));
704 // Sample recursion restarts from [ll, rr]
705 final int p = quickSelectAdaptive(a, ll, rr, k, k, upper, MODE_FR_SAMPLING);
706 return expandPartition(a, l, r, ll, rr, p, upper[0], upper);
707 }
708
709 /**
710 * Partition an array slice around a pivot. Partitioning exchanges array elements such
711 * that all elements smaller than pivot are before it and all elements larger than
712 * pivot are after it.
713 *
714 * <p>Assumes the range {@code r - l >= 8}; the caller is responsible for selection on a smaller
715 * range. If using a 12th-tile for sampling then assumes {@code r - l >= 11}.
716 *
717 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm
718 * with the median of 3 then median of 3; the final sample is placed in the
719 * 5th 9th-tile; the pivot chosen from the sample is adaptive using the input {@code k}.
720 *
721 * <p>Given a pivot in the middle of the sample this has margins of 2/9 and 2/9.
722 *
723 * @param a Data array.
724 * @param l Lower bound (inclusive).
725 * @param r Upper bound (inclusive).
726 * @param k Target index.
727 * @param upper Upper bound (inclusive) of the pivot range.
728 * @param flags Control flags.
729 * @return Lower bound (inclusive) of the pivot range.
730 */
731 private static int repeatedStep(double[] a, int l, int r, int k, int[] upper, int flags) {
732 // Adapted from Alexandrescu (2016), algorithm 8.
733 final int fp;
734 final int s;
735 int p;
736 if (flags <= MODE_SAMPLING) {
737 // Median into a 12th-tile
738 fp = (r - l + 1) / 12;
739 // Position the sample around the target k
740 s = k - mapDistance(k - l, l, r, fp);
741 p = k;
742 } else {
743 // i in tertile [3f':6f')
744 fp = (r - l + 1) / 9;
745 final int f3 = 3 * fp;
746 final int end = l + (f3 << 1);
747 for (int i = l + f3; i < end; i++) {
748 Sorting.sort3(a, i - f3, i, i + f3);
749 }
750 // 5th 9th-tile: [4f':5f')
751 s = l + (fp << 2);
752 // No adaption uses the middle to enforce strict margins
753 p = s + (flags == MODE_ADAPTION ? mapDistance(k - l, l, r, fp) : (fp >>> 1));
754 }
755 final int e = s + fp - 1;
756 for (int i = s; i <= e; i++) {
757 Sorting.sort3(a, i - fp, i, i + fp);
758 }
759 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING);
760 return expandPartition(a, l, r, s, e, p, upper[0], upper);
761 }
762
763 /**
764 * Partition an array slice around a pivot. Partitioning exchanges array elements such
765 * that all elements smaller than pivot are before it and all elements larger than
766 * pivot are after it.
767 *
768 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller
769 * range.
770 *
771 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm
772 * with the lower median of 4 then either median of 3 with the final sample placed in the
773 * 5th 12th-tile, or min of 3 with the final sample in the 4th 12th-tile;
774 * the pivot chosen from the sample is adaptive using the input {@code k}.
775 *
776 * <p>Given a pivot in the middle of the sample this has margins of 1/6 and 1/4.
777 *
778 * @param a Data array.
779 * @param l Lower bound (inclusive).
780 * @param r Upper bound (inclusive).
781 * @param k Target index.
782 * @param upper Upper bound (inclusive) of the pivot range.
783 * @param flags Control flags.
784 * @return Lower bound (inclusive) of the pivot range.
785 */
786 private static int repeatedStepLeft(double[] a, int l, int r, int k, int[] upper, int flags) {
787 // Adapted from Alexandrescu (2016), algorithm 9.
788 final int fp;
789 final int s;
790 int p;
791 if (flags <= MODE_SAMPLING) {
792 // Median into a 12th-tile
793 fp = (r - l + 1) / 12;
794 // Position the sample around the target k
795 // Avoid bounds error due to rounding as (k-l)/(r-l) -> 1/12
796 s = Math.max(k - mapDistance(k - l, l, r, fp), l + fp);
797 p = k;
798 } else {
799 // i in 2nd quartile
800 final int f = (r - l + 1) >> 2;
801 final int f2 = f + f;
802 final int end = l + f2;
803 for (int i = l + f; i < end; i++) {
804 Sorting.lowerMedian4(a, i - f, i, i + f, i + f2);
805 }
806 // i in 5th 12th-tile
807 fp = f / 3;
808 s = l + f + fp;
809 // No adaption uses the middle to enforce strict margins
810 p = s + (flags == MODE_ADAPTION ? mapDistance(k - l, l, r, fp) : (fp >>> 1));
811 }
812 final int e = s + fp - 1;
813 for (int i = s; i <= e; i++) {
814 Sorting.sort3(a, i - fp, i, i + fp);
815 }
816 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING);
817 return expandPartition(a, l, r, s, e, p, upper[0], upper);
818 }
819
820 /**
821 * Partition an array slice around a pivot. Partitioning exchanges array elements such
822 * that all elements smaller than pivot are before it and all elements larger than
823 * pivot are after it.
824 *
825 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller
826 * range.
827 *
828 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm
829 * with the upper median of 4 then either median of 3 with the final sample placed in the
830 * 8th 12th-tile, or max of 3 with the final sample in the 9th 12th-tile;
831 * the pivot chosen from the sample is adaptive using the input {@code k}.
832 *
833 * <p>Given a pivot in the middle of the sample this has margins of 1/4 and 1/6.
834 *
835 * @param a Data array.
836 * @param l Lower bound (inclusive).
837 * @param r Upper bound (inclusive).
838 * @param k Target index.
839 * @param upper Upper bound (inclusive) of the pivot range.
840 * @param flags Control flags.
841 * @return Lower bound (inclusive) of the pivot range.
842 */
843 private static int repeatedStepRight(double[] a, int l, int r, int k, int[] upper, int flags) {
844 // Mirror image repeatedStepLeft using upper median into 3rd quartile
845 final int fp;
846 final int e;
847 int p;
848 if (flags <= MODE_SAMPLING) {
849 // Median into a 12th-tile
850 fp = (r - l + 1) / 12;
851 // Position the sample around the target k
852 // Avoid bounds error due to rounding as (r-k)/(r-l) -> 11/12
853 e = Math.min(k + mapDistance(r - k, l, r, fp), r - fp);
854 p = k;
855 } else {
856 // i in 3rd quartile
857 final int f = (r - l + 1) >> 2;
858 final int f2 = f + f;
859 final int end = r - f2;
860 for (int i = r - f; i > end; i--) {
861 Sorting.upperMedian4(a, i - f2, i - f, i, i + f);
862 }
863 // i in 8th 12th-tile
864 fp = f / 3;
865 e = r - f - fp;
866 // No adaption uses the middle to enforce strict margins
867 p = e - (flags == MODE_ADAPTION ? mapDistance(r - k, l, r, fp) : (fp >>> 1));
868 }
869 final int s = e - fp + 1;
870 for (int i = s; i <= e; i++) {
871 Sorting.sort3(a, i - fp, i, i + fp);
872 }
873 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING);
874 return expandPartition(a, l, r, s, e, p, upper[0], upper);
875 }
876
877 /**
878 * Partition an array slice around a pivot. Partitioning exchanges array elements such
879 * that all elements smaller than pivot are before it and all elements larger than
880 * pivot are after it.
881 *
882 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller
883 * range.
884 *
885 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm
886 * with the minimum of 4 then median of 3; the final sample is placed in the
887 * 2nd 12th-tile; the pivot chosen from the sample is adaptive using the input {@code k}.
888 *
889 * <p>Given a pivot in the middle of the sample this has margins of 1/12 and 1/3.
890 *
891 * @param a Data array.
892 * @param l Lower bound (inclusive).
893 * @param r Upper bound (inclusive).
894 * @param k Target index.
895 * @param upper Upper bound (inclusive) of the pivot range.
896 * @param flags Control flags.
897 * @return Lower bound (inclusive) of the pivot range.
898 */
899 private static int repeatedStepFarLeft(double[] a, int l, int r, int k, int[] upper, int flags) {
900 // Far step has been changed from the Alexandrescu (2016) step of lower-median-of-4, min-of-3
901 // into the 4th 12th-tile to a min-of-4, median-of-3 into the 2nd 12th-tile.
902 // The differences are:
903 // - The upper margin when not sampling is 8/24 vs. 9/24; the lower margin remains at 1/12.
904 // - The position of the sample is closer to the expected location of k < |A| / 12.
905 // - Sampling mode uses a median-of-3 with adaptive k, matching the other step methods.
906 // A min-of-3 sample can create a pivot too small if used with adaption of k leaving
907 // k in the larger parition and a wasted iteration.
908 // - Adaption is adjusted to force use of the lower margin when not sampling.
909 final int fp;
910 final int s;
911 int p;
912 if (flags <= MODE_SAMPLING) {
913 // 2nd 12th-tile
914 fp = (r - l + 1) / 12;
915 s = l + fp;
916 // Use adaption
917 p = s + mapDistance(k - l, l, r, fp);
918 } else {
919 // i in 2nd quartile; min into i-f (1st quartile)
920 final int f = (r - l + 1) >> 2;
921 final int f2 = f + f;
922 final int end = l + f2;
923 for (int i = l + f; i < end; i++) {
924 if (a[i + f] < a[i - f]) {
925 final double u = a[i + f];
926 a[i + f] = a[i - f];
927 a[i - f] = u;
928 }
929 if (a[i + f2] < a[i]) {
930 final double v = a[i + f2];
931 a[i + f2] = a[i];
932 a[i] = v;
933 }
934 if (a[i] < a[i - f]) {
935 final double u = a[i];
936 a[i] = a[i - f];
937 a[i - f] = u;
938 }
939 }
940 // 2nd 12th-tile
941 fp = f / 3;
942 s = l + fp;
943 // Lower margin has 2(d+1) elements; d == (position in sample) - s
944 // Force k into the lower margin
945 p = s + ((k - l) >>> 1);
946 }
947 final int e = s + fp - 1;
948 for (int i = s; i <= e; i++) {
949 Sorting.sort3(a, i - fp, i, i + fp);
950 }
951 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING);
952 return expandPartition(a, l, r, s, e, p, upper[0], upper);
953 }
954
955 /**
956 * Partition an array slice around a pivot. Partitioning exchanges array elements such
957 * that all elements smaller than pivot are before it and all elements larger than
958 * pivot are after it.
959 *
960 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller
961 * range.
962 *
963 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm
964 * with the maximum of 4 then median of 3; the final sample is placed in the
965 * 11th 12th-tile; the pivot chosen from the sample is adaptive using the input {@code k}.
966 *
967 * <p>Given a pivot in the middle of the sample this has margins of 1/3 and 1/12.
968 *
969 * @param a Data array.
970 * @param l Lower bound (inclusive).
971 * @param r Upper bound (inclusive).
972 * @param k Target index.
973 * @param upper Upper bound (inclusive) of the pivot range.
974 * @param flags Control flags.
975 * @return Lower bound (inclusive) of the pivot range.
976 */
977 private static int repeatedStepFarRight(double[] a, int l, int r, int k, int[] upper, int flags) {
978 // Mirror image repeatedStepFarLeft
979 final int fp;
980 final int e;
981 int p;
982 if (flags <= MODE_SAMPLING) {
983 // 11th 12th-tile
984 fp = (r - l + 1) / 12;
985 e = r - fp;
986 // Use adaption
987 p = e - mapDistance(r - k, l, r, fp);
988 } else {
989 // i in 3rd quartile; max into i+f (4th quartile)
990 final int f = (r - l + 1) >> 2;
991 final int f2 = f + f;
992 final int end = r - f2;
993 for (int i = r - f; i > end; i--) {
994 if (a[i - f] > a[i + f]) {
995 final double u = a[i - f];
996 a[i - f] = a[i + f];
997 a[i + f] = u;
998 }
999 if (a[i - f2] > a[i]) {
1000 final double v = a[i - f2];
1001 a[i - f2] = a[i];
1002 a[i] = v;
1003 }
1004 if (a[i] > a[i + f]) {
1005 final double u = a[i];
1006 a[i] = a[i + f];
1007 a[i + f] = u;
1008 }
1009 }
1010 // 11th 12th-tile
1011 fp = f / 3;
1012 e = r - fp;
1013 // Upper margin has 2(d+1) elements; d == e - (position in sample)
1014 // Force k into the upper margin
1015 p = e - ((r - k) >>> 1);
1016 }
1017 final int s = e - fp + 1;
1018 for (int i = s; i <= e; i++) {
1019 Sorting.sort3(a, i - fp, i, i + fp);
1020 }
1021 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING);
1022 return expandPartition(a, l, r, s, e, p, upper[0], upper);
1023 }
1024
1025 /**
1026 * Expand a partition around a single pivot. Partitioning exchanges array
1027 * elements such that all elements smaller than pivot are before it and all
1028 * elements larger than pivot are after it. The central region is already
1029 * partitioned.
1030 *
1031 * <pre>{@code
1032 * |l |s |p0 p1| e| r|
1033 * | ??? | <P | ==P | >P | ??? |
1034 * }</pre>
1035 *
1036 * <p>This requires that {@code start != end}. However it handles
1037 * {@code left == start} and/or {@code end == right}.
1038 *
1039 * @param a Data array.
1040 * @param left Lower bound (inclusive).
1041 * @param right Upper bound (inclusive).
1042 * @param start Start of the partition range (inclusive).
1043 * @param end End of the partitioned range (inclusive).
1044 * @param pivot0 Lower pivot location (inclusive).
1045 * @param pivot1 Upper pivot location (inclusive).
1046 * @param upper Upper bound (inclusive) of the pivot range [k1].
1047 * @return Lower bound (inclusive) of the pivot range [k0].
1048 */
1049 // package-private for testing
1050 static int expandPartition(double[] a, int left, int right, int start, int end,
1051 int pivot0, int pivot1, int[] upper) {
1052 // 3-way partition of the data using a pivot value into
1053 // less-than, equal or greater-than.
1054 // Based on Sedgewick's Bentley-McIroy partitioning: always swap i<->j then
1055 // check for equal to the pivot and move again.
1056 //
1057 // Move sentinels from start and end to left and right. Scan towards the
1058 // sentinels until >=,<=. Swap then move == to the pivot region.
1059 // <-i j->
1060 // |l | | |p0 p1| | | r|
1061 // |>=| ??? | < | == | > | ??? |<=|
1062 //
1063 // When either i or j reach the edge perform finishing loop.
1064 // Finish loop for a[j] <= v replaces j with p1+1, optionally moves value
1065 // to p0 for < and updates the pivot range p1 (and optionally p0):
1066 // j->
1067 // |l |p0 p1| | | r|
1068 // | < | == | > | ??? |<=|
1069
1070 final double v = a[pivot0];
1071 // Use start/end as sentinels (requires start != end)
1072 double vi = a[start];
1073 double vj = a[end];
1074 a[start] = a[left];
1075 a[end] = a[right];
1076 a[left] = vj;
1077 a[right] = vi;
1078
1079 int i = start + 1;
1080 int j = end - 1;
1081
1082 // Positioned for pre-in/decrement to write to pivot region
1083 int p0 = pivot0 == start ? i : pivot0;
1084 int p1 = pivot1 == end ? j : pivot1;
1085
1086 while (true) {
1087 do {
1088 --i;
1089 } while (a[i] < v);
1090 do {
1091 ++j;
1092 } while (a[j] > v);
1093 vj = a[i];
1094 vi = a[j];
1095 a[i] = vi;
1096 a[j] = vj;
1097 // Move the equal values to pivot region
1098 if (vi == v) {
1099 a[i] = a[--p0];
1100 a[p0] = v;
1101 }
1102 if (vj == v) {
1103 a[j] = a[++p1];
1104 a[p1] = v;
1105 }
1106 // Termination check and finishing loops.
1107 // Note: This works even if pivot region is zero length (p1 == p0-1 due to
1108 // length 1 pivot region at either start/end) because we pre-inc/decrement
1109 // one side and post-inc/decrement the other side.
1110 if (i == left) {
1111 while (j < right) {
1112 do {
1113 ++j;
1114 } while (a[j] > v);
1115 final double w = a[j];
1116 // Move upper bound of pivot region
1117 a[j] = a[++p1];
1118 a[p1] = v;
1119 // Move lower bound of pivot region
1120 if (w != v) {
1121 a[p0] = w;
1122 p0++;
1123 }
1124 }
1125 break;
1126 }
1127 if (j == right) {
1128 while (i > left) {
1129 do {
1130 --i;
1131 } while (a[i] < v);
1132 final double w = a[i];
1133 // Move lower bound of pivot region
1134 a[i] = a[--p0];
1135 a[p0] = v;
1136 // Move upper bound of pivot region
1137 if (w != v) {
1138 a[p1] = w;
1139 p1--;
1140 }
1141 }
1142 break;
1143 }
1144 }
1145
1146 upper[0] = p1;
1147 return p0;
1148 }
1149
1150 /**
1151 * Partition the array such that indices {@code k} correspond to their
1152 * correctly sorted value in the equivalent fully sorted array.
1153 *
1154 * <p>For all indices {@code k} and any index {@code i}:
1155 *
1156 * <pre>{@code
1157 * data[i < k] <= data[k] <= data[k < i]
1158 * }</pre>
1159 *
1160 * <p>This function accepts a {@link UpdatingInterval} of indices {@code k} that define the
1161 * range of indices to partition. The {@link UpdatingInterval} can be narrowed or split as
1162 * partitioning divides the range.
1163 *
1164 * <p>Uses an introselect variant. The quickselect is a dual-pivot quicksort
1165 * partition method by Vladimir Yaroslavskiy; the fall-back on poor convergence of
1166 * the quickselect is a heapselect.
1167 *
1168 * <p>The {@code flags} contain the the current recursion count and the configured
1169 * length threshold for {@code r - l} to perform sort select. The count is in the upper
1170 * bits and the threshold is in the lower bits.
1171 *
1172 * @param a Values.
1173 * @param left Lower bound of data (inclusive, assumed to be strictly positive).
1174 * @param right Upper bound of data (inclusive, assumed to be strictly positive).
1175 * @param k Interval of indices to partition (ordered).
1176 * @param flags Control flags.
1177 */
1178 // package-private for testing
1179 static void dualPivotQuickSelect(double[] a, int left, int right, UpdatingInterval k, int flags) {
1180 // If partitioning splits the interval then recursion is used for the left-most side(s)
1181 // and the right-most side remains within this function. If partitioning does
1182 // not split the interval then it remains within this function.
1183 int l = left;
1184 int r = right;
1185 int f = flags;
1186 int ka = k.left();
1187 int kb = k.right();
1188 final int[] upper = {0, 0, 0};
1189 while (true) {
1190 // Select when ka and kb are close to the same end,
1191 // or the entire range is small
1192 // |l|-----|ka|--------|kb|------|r|
1193 final int n = r - l;
1194 if (Math.min(kb - l, r - ka) < DP_SORTSELECT_SIZE ||
1195 n < (f & SORTSELECT_MASK)) {
1196 sortSelect(a, l, r, ka, kb);
1197 return;
1198 }
1199 if (kb - ka < DP_SORTSELECT_SIZE) {
1200 // Switch to single-pivot mode with Floyd-Rivest sub-sampling
1201 quickSelectAdaptive(a, l, r, ka, kb, upper, MODE_FR_SAMPLING);
1202 return;
1203 }
1204 if (f < 0) {
1205 // Excess recursion, switch to heap select
1206 heapSelect(a, l, r, ka, kb);
1207 return;
1208 }
1209
1210 // Dual-pivot partitioning
1211 final int p0 = partition(a, l, r, upper);
1212 final int p1 = upper[0];
1213
1214 // Recursion to max depth
1215 // Note: Here we possibly branch left, middle and right with multiple keys.
1216 // It is possible that the partition has split the keys
1217 // and the recursion proceeds with a reduced set in each region.
1218 // p0 p1 p2 p3
1219 // |l|--|ka|--k----k--|P|------k--|kb|----|P|----|r|
1220 // kb | ka
1221 f += RECURSION_INCREMENT;
1222 // Recurse left side if required
1223 if (ka < p0) {
1224 if (kb <= p1) {
1225 // Entirely on left side
1226 r = p0 - 1;
1227 if (r < kb) {
1228 kb = k.updateRight(r);
1229 }
1230 continue;
1231 }
1232 dualPivotQuickSelect(a, l, p0 - 1, k.splitLeft(p0, p1), f);
1233 // Here we must process middle and/or right
1234 ka = k.left();
1235 } else if (kb <= p1) {
1236 // No middle/right side
1237 return;
1238 } else if (ka <= p1) {
1239 // Advance lower bound
1240 ka = k.updateLeft(p1 + 1);
1241 }
1242 // Recurse middle if required
1243 final int p2 = upper[1];
1244 final int p3 = upper[2];
1245 if (ka < p2) {
1246 l = p1 + 1;
1247 if (kb <= p3) {
1248 // Entirely in middle
1249 r = p2 - 1;
1250 if (r < kb) {
1251 kb = k.updateRight(r);
1252 }
1253 continue;
1254 }
1255 dualPivotQuickSelect(a, l, p2 - 1, k.splitLeft(p2, p3), f);
1256 ka = k.left();
1257 } else if (kb <= p3) {
1258 // No right side
1259 return;
1260 } else if (ka <= p3) {
1261 ka = k.updateLeft(p3 + 1);
1262 }
1263 // Continue right
1264 l = p3 + 1;
1265 }
1266 }
1267
1268 /**
1269 * Partition an array slice around 2 pivots. Partitioning exchanges array elements
1270 * such that all elements smaller than pivot are before it and all elements larger
1271 * than pivot are after it.
1272 *
1273 * <p>This method returns 4 points describing the pivot ranges of equal values.
1274 *
1275 * <pre>{@code
1276 * |k0 k1| |k2 k3|
1277 * | <P | ==P1 | <P1 && <P2 | ==P2 | >P |
1278 * }</pre>
1279 *
1280 * <ul>
1281 * <li>k0: lower pivot1 point
1282 * <li>k1: upper pivot1 point (inclusive)
1283 * <li>k2: lower pivot2 point
1284 * <li>k3: upper pivot2 point (inclusive)
1285 * </ul>
1286 *
1287 * <p>Bounds are set so {@code i < k0}, {@code i > k3} and {@code k1 < i < k2} are
1288 * unsorted. When the range {@code [k0, k3]} contains fully sorted elements the result
1289 * is set to {@code k1 = k3; k2 == k0}. This can occur if
1290 * {@code P1 == P2} or there are zero or one value between the pivots
1291 * {@code P1 < v < P2}. Any sort/partition of ranges [left, k0-1], [k1+1, k2-1] and
1292 * [k3+1, right] must check the length is {@code > 1}.
1293 *
1294 * @param a Data array.
1295 * @param left Lower bound (inclusive).
1296 * @param right Upper bound (inclusive).
1297 * @param bounds Points [k1, k2, k3].
1298 * @return Lower bound (inclusive) of the pivot range [k0].
1299 */
1300 private static int partition(double[] a, int left, int right, int[] bounds) {
1301 // Pick 2 pivots from 5 approximately uniform through the range.
1302 // Spacing is ~ 1/7 made using shifts. Other strategies are equal or much
1303 // worse. 1/7 = 5/35 ~ 1/8 + 1/64 : 0.1429 ~ 0.1406
1304 // Ensure the value is above zero to choose different points!
1305 final int n = right - left;
1306 final int step = 1 + (n >>> 3) + (n >>> 6);
1307 final int i3 = left + (n >>> 1);
1308 final int i2 = i3 - step;
1309 final int i1 = i2 - step;
1310 final int i4 = i3 + step;
1311 final int i5 = i4 + step;
1312 Sorting.sort5(a, i1, i2, i3, i4, i5);
1313
1314 // Partition data using pivots P1 and P2 into less-than, greater-than or between.
1315 // Pivot values P1 & P2 are placed at the end. If P1 < P2, P2 acts as a sentinel.
1316 // k traverses the unknown region ??? and values moved if less-than or
1317 // greater-than:
1318 //
1319 // left less k great right
1320 // |P1| <P1 | P1 <= & <= P2 | ??? | >P2 |P2|
1321 //
1322 // <P1 (left, lt)
1323 // P1 <= & <= P2 [lt, k)
1324 // >P2 (gt, right)
1325 //
1326 // At the end pivots are swapped back to behind the less and great pointers.
1327 //
1328 // | <P1 |P1| P1<= & <= P2 |P2| >P2 |
1329
1330 // Swap ends to the pivot locations.
1331 final double v1 = a[i2];
1332 a[i2] = a[left];
1333 a[left] = v1;
1334 final double v2 = a[i4];
1335 a[i4] = a[right];
1336 a[right] = v2;
1337
1338 // pointers
1339 int less = left;
1340 int great = right;
1341
1342 // Fast-forward ascending / descending runs to reduce swaps.
1343 // Cannot overrun as end pivots (v1 <= v2) act as sentinels.
1344 do {
1345 ++less;
1346 } while (a[less] < v1);
1347 do {
1348 --great;
1349 } while (a[great] > v2);
1350
1351 // a[less - 1] < P1 : a[great + 1] > P2
1352 // unvisited in [less, great]
1353 SORTING:
1354 for (int k = less - 1; ++k <= great;) {
1355 final double v = a[k];
1356 if (v < v1) {
1357 // swap(a, k, less++)
1358 a[k] = a[less];
1359 a[less] = v;
1360 less++;
1361 } else if (v > v2) {
1362 // while k < great and a[great] > v2:
1363 // great--
1364 while (a[great] > v2) {
1365 if (great-- == k) {
1366 // Done
1367 break SORTING;
1368 }
1369 }
1370 // swap(a, k, great--)
1371 // if a[k] < v1:
1372 // swap(a, k, less++)
1373 final double w = a[great];
1374 a[great] = v;
1375 great--;
1376 // delay a[k] = w
1377 if (w < v1) {
1378 a[k] = a[less];
1379 a[less] = w;
1380 less++;
1381 } else {
1382 a[k] = w;
1383 }
1384 }
1385 }
1386
1387 // Change to inclusive ends : a[less] < P1 : a[great] > P2
1388 less--;
1389 great++;
1390 // Move the pivots to correct locations
1391 a[left] = a[less];
1392 a[less] = v1;
1393 a[right] = a[great];
1394 a[great] = v2;
1395
1396 // Record the pivot locations
1397 final int lower = less;
1398 bounds[2] = great;
1399
1400 // equal elements
1401 // Original paper: If middle partition is bigger than a threshold
1402 // then check for equal elements.
1403
1404 // Note: This is extra work. When performing partitioning the region of interest
1405 // may be entirely above or below the central region and this can be skipped.
1406
1407 // Here we look for equal elements if the centre is more than 5/8 the length.
1408 // 5/8 = 1/2 + 1/8. Pivots must be different.
1409 if ((great - less) > (n >>> 1) + (n >>> 3) && v1 != v2) {
1410
1411 // Fast-forward to reduce swaps. Changes inclusive ends to exclusive ends.
1412 // Since v1 != v2 these act as sentinels to prevent overrun.
1413 do {
1414 ++less;
1415 } while (a[less] == v1);
1416 do {
1417 --great;
1418 } while (a[great] == v2);
1419
1420 // This copies the logic in the sorting loop using == comparisons
1421 EQUAL:
1422 for (int k = less - 1; ++k <= great;) {
1423 final double v = a[k];
1424 if (v == v1) {
1425 a[k] = a[less];
1426 a[less] = v;
1427 less++;
1428 } else if (v == v2) {
1429 while (a[great] == v2) {
1430 if (great-- == k) {
1431 // Done
1432 break EQUAL;
1433 }
1434 }
1435 final double w = a[great];
1436 a[great] = v;
1437 great--;
1438 if (w == v1) {
1439 a[k] = a[less];
1440 a[less] = w;
1441 less++;
1442 } else {
1443 a[k] = w;
1444 }
1445 }
1446 }
1447
1448 // Change to inclusive ends
1449 less--;
1450 great++;
1451 }
1452
1453 // Between pivots in (less, great)
1454 if (v1 != v2 && less < great - 1) {
1455 // Record the pivot end points
1456 bounds[0] = less;
1457 bounds[1] = great;
1458 } else {
1459 // No unsorted internal region (set k1 = k3; k2 = k0)
1460 bounds[0] = bounds[2];
1461 bounds[1] = lower;
1462 }
1463
1464 return lower;
1465 }
1466
1467 /**
1468 * Partition the elements between {@code ka} and {@code kb} using a heap select
1469 * algorithm. It is assumed {@code left <= ka <= kb <= right}.
1470 *
1471 * @param a Data array to use to find out the K<sup>th</sup> value.
1472 * @param left Lower bound (inclusive).
1473 * @param right Upper bound (inclusive).
1474 * @param ka Lower index to select.
1475 * @param kb Upper index to select.
1476 */
1477 static void heapSelect(int[] a, int left, int right, int ka, int kb) {
1478 if (right <= left) {
1479 return;
1480 }
1481 // Use the smallest heap
1482 if (kb - left < right - ka) {
1483 heapSelectLeft(a, left, right, ka, kb);
1484 } else {
1485 heapSelectRight(a, left, right, ka, kb);
1486 }
1487 }
1488
1489 /**
1490 * Partition the elements between {@code ka} and {@code kb} using a heap select
1491 * algorithm. It is assumed {@code left <= ka <= kb <= right}.
1492 *
1493 * <p>For best performance this should be called with {@code k} in the lower
1494 * half of the range.
1495 *
1496 * @param a Data array to use to find out the K<sup>th</sup> value.
1497 * @param left Lower bound (inclusive).
1498 * @param right Upper bound (inclusive).
1499 * @param ka Lower index to select.
1500 * @param kb Upper index to select.
1501 */
1502 static void heapSelectLeft(int[] a, int left, int right, int ka, int kb) {
1503 // Create a max heap in-place in [left, k], rooted at a[left] = max
1504 // |l|-max-heap-|k|--------------|
1505 // Build the heap using Floyd's heap-construction algorithm for heap size n.
1506 // Start at parent of the last element in the heap (k),
1507 // i.e. start = parent(n-1) : parent(c) = floor((c - 1) / 2) : c = k - left
1508 int end = kb + 1;
1509 for (int p = left + ((kb - left - 1) >> 1); p >= left; p--) {
1510 maxHeapSiftDown(a, a[p], p, left, end);
1511 }
1512 // Scan the remaining data and insert
1513 // Mitigate worst case performance on descending data by backward sweep
1514 int max = a[left];
1515 for (int i = right + 1; --i > kb;) {
1516 final int v = a[i];
1517 if (v < max) {
1518 a[i] = max;
1519 maxHeapSiftDown(a, v, left, left, end);
1520 max = a[left];
1521 }
1522 }
1523 // Partition [ka, kb]
1524 // |l|-max-heap-|k|--------------|
1525 // | <-swap-> | then sift down reduced size heap
1526 // Avoid sifting heap of size 1
1527 final int last = Math.max(left, ka - 1);
1528 while (--end > last) {
1529 maxHeapSiftDown(a, a[end], left, left, end);
1530 a[end] = max;
1531 max = a[left];
1532 }
1533 }
1534
1535 /**
1536 * Sift the element down the max heap.
1537 *
1538 * <p>Assumes {@code root <= p < end}, i.e. the max heap is above root.
1539 *
1540 * @param a Heap data.
1541 * @param v Value to sift.
1542 * @param p Start position.
1543 * @param root Root of the heap.
1544 * @param end End of the heap (exclusive).
1545 */
1546 private static void maxHeapSiftDown(int[] a, int v, int p, int root, int end) {
1547 // child2 = root + 2 * (parent - root) + 2
1548 // = 2 * parent - root + 2
1549 while (true) {
1550 // Right child
1551 int c = (p << 1) - root + 2;
1552 if (c > end) {
1553 // No left child
1554 break;
1555 }
1556 // Use the left child if right doesn't exist, or it is greater
1557 if (c == end || a[c] < a[c - 1]) {
1558 --c;
1559 }
1560 if (v >= a[c]) {
1561 // Parent greater than largest child - done
1562 break;
1563 }
1564 // Swap and descend
1565 a[p] = a[c];
1566 p = c;
1567 }
1568 a[p] = v;
1569 }
1570
1571 /**
1572 * Partition the elements between {@code ka} and {@code kb} using a heap select
1573 * algorithm. It is assumed {@code left <= ka <= kb <= right}.
1574 *
1575 * <p>For best performance this should be called with {@code k} in the upper
1576 * half of the range.
1577 *
1578 * @param a Data array to use to find out the K<sup>th</sup> value.
1579 * @param left Lower bound (inclusive).
1580 * @param right Upper bound (inclusive).
1581 * @param ka Lower index to select.
1582 * @param kb Upper index to select.
1583 */
1584 static void heapSelectRight(int[] a, int left, int right, int ka, int kb) {
1585 // Create a min heap in-place in [k, right], rooted at a[right] = min
1586 // |--------------|k|-min-heap-|r|
1587 // Build the heap using Floyd's heap-construction algorithm for heap size n.
1588 // Start at parent of the last element in the heap (k),
1589 // i.e. start = parent(n-1) : parent(c) = floor((c - 1) / 2) : c = right - k
1590 int end = ka - 1;
1591 for (int p = right - ((right - ka - 1) >> 1); p <= right; p++) {
1592 minHeapSiftDown(a, a[p], p, right, end);
1593 }
1594 // Scan the remaining data and insert
1595 // Mitigate worst case performance on descending data by backward sweep
1596 int min = a[right];
1597 for (int i = left - 1; ++i < ka;) {
1598 final int v = a[i];
1599 if (v > min) {
1600 a[i] = min;
1601 minHeapSiftDown(a, v, right, right, end);
1602 min = a[right];
1603 }
1604 }
1605 // Partition [ka, kb]
1606 // |--------------|k|-min-heap-|r|
1607 // | <-swap-> | then sift down reduced size heap
1608 // Avoid sifting heap of size 1
1609 final int last = Math.min(right, kb + 1);
1610 while (++end < last) {
1611 minHeapSiftDown(a, a[end], right, right, end);
1612 a[end] = min;
1613 min = a[right];
1614 }
1615 }
1616
1617 /**
1618 * Sift the element down the min heap.
1619 *
1620 * <p>Assumes {@code root >= p > end}, i.e. the max heap is below root.
1621 *
1622 * @param a Heap data.
1623 * @param v Value to sift.
1624 * @param p Start position.
1625 * @param root Root of the heap.
1626 * @param end End of the heap (exclusive).
1627 */
1628 private static void minHeapSiftDown(int[] a, int v, int p, int root, int end) {
1629 // child2 = root - 2 * (root - parent) - 2
1630 // = 2 * parent - root - 2
1631 while (true) {
1632 // Right child
1633 int c = (p << 1) - root - 2;
1634 if (c < end) {
1635 // No left child
1636 break;
1637 }
1638 // Use the left child if right doesn't exist, or it is less
1639 if (c == end || a[c] > a[c + 1]) {
1640 ++c;
1641 }
1642 if (v <= a[c]) {
1643 // Parent less than smallest child - done
1644 break;
1645 }
1646 // Swap and descend
1647 a[p] = a[c];
1648 p = c;
1649 }
1650 a[p] = v;
1651 }
1652
1653 /**
1654 * Partition the elements between {@code ka} and {@code kb} using a sort select
1655 * algorithm. It is assumed {@code left <= ka <= kb <= right}.
1656 *
1657 * @param a Data array to use to find out the K<sup>th</sup> value.
1658 * @param left Lower bound (inclusive).
1659 * @param right Upper bound (inclusive).
1660 * @param ka Lower index to select.
1661 * @param kb Upper index to select.
1662 */
1663 static void sortSelect(int[] a, int left, int right, int ka, int kb) {
1664 // Combine the test for right <= left with
1665 // avoiding the overhead of sort select on tiny data.
1666 if (right - left <= MIN_SORTSELECT_SIZE) {
1667 Sorting.sort(a, left, right);
1668 return;
1669 }
1670 // Sort the smallest side
1671 if (kb - left < right - ka) {
1672 sortSelectLeft(a, left, right, kb);
1673 } else {
1674 sortSelectRight(a, left, right, ka);
1675 }
1676 }
1677
1678 /**
1679 * Partition the minimum {@code n} elements below {@code k} where
1680 * {@code n = k - left + 1}. Uses an insertion sort algorithm.
1681 *
1682 * <p>Works with any {@code k} in the range {@code left <= k <= right}
1683 * and performs a full sort of the range below {@code k}.
1684 *
1685 * <p>For best performance this should be called with
1686 * {@code k - left < right - k}, i.e.
1687 * to partition a value in the lower half of the range.
1688 *
1689 * @param a Data array to use to find out the K<sup>th</sup> value.
1690 * @param left Lower bound (inclusive).
1691 * @param right Upper bound (inclusive).
1692 * @param k Index to select.
1693 */
1694 static void sortSelectLeft(int[] a, int left, int right, int k) {
1695 // Sort
1696 for (int i = left; ++i <= k;) {
1697 final int v = a[i];
1698 // Move preceding higher elements above (if required)
1699 if (v < a[i - 1]) {
1700 int j = i;
1701 while (--j >= left && v < a[j]) {
1702 a[j + 1] = a[j];
1703 }
1704 a[j + 1] = v;
1705 }
1706 }
1707 // Scan the remaining data and insert
1708 // Mitigate worst case performance on descending data by backward sweep
1709 int m = a[k];
1710 for (int i = right + 1; --i > k;) {
1711 final int v = a[i];
1712 if (v < m) {
1713 a[i] = m;
1714 int j = k;
1715 while (--j >= left && v < a[j]) {
1716 a[j + 1] = a[j];
1717 }
1718 a[j + 1] = v;
1719 m = a[k];
1720 }
1721 }
1722 }
1723
1724 /**
1725 * Partition the maximum {@code n} elements above {@code k} where
1726 * {@code n = right - k + 1}. Uses an insertion sort algorithm.
1727 *
1728 * <p>Works with any {@code k} in the range {@code left <= k <= right}
1729 * and can be used to perform a full sort of the range above {@code k}.
1730 *
1731 * <p>For best performance this should be called with
1732 * {@code k - left > right - k}, i.e.
1733 * to partition a value in the upper half of the range.
1734 *
1735 * @param a Data array to use to find out the K<sup>th</sup> value.
1736 * @param left Lower bound (inclusive).
1737 * @param right Upper bound (inclusive).
1738 * @param k Index to select.
1739 */
1740 static void sortSelectRight(int[] a, int left, int right, int k) {
1741 // Sort
1742 for (int i = right; --i >= k;) {
1743 final int v = a[i];
1744 // Move succeeding lower elements below (if required)
1745 if (v > a[i + 1]) {
1746 int j = i;
1747 while (++j <= right && v > a[j]) {
1748 a[j - 1] = a[j];
1749 }
1750 a[j - 1] = v;
1751 }
1752 }
1753 // Scan the remaining data and insert
1754 // Mitigate worst case performance on descending data by backward sweep
1755 int m = a[k];
1756 for (int i = left - 1; ++i < k;) {
1757 final int v = a[i];
1758 if (v > m) {
1759 a[i] = m;
1760 int j = k;
1761 while (++j <= right && v > a[j]) {
1762 a[j - 1] = a[j];
1763 }
1764 a[j - 1] = v;
1765 m = a[k];
1766 }
1767 }
1768 }
1769
1770 /**
1771 * Partition the array such that index {@code k} corresponds to its correctly
1772 * sorted value in the equivalent fully sorted array.
1773 *
1774 * <p>Assumes {@code k} is a valid index into [left, right].
1775 *
1776 * @param a Values.
1777 * @param left Lower bound of data (inclusive).
1778 * @param right Upper bound of data (inclusive).
1779 * @param k Index.
1780 */
1781 static void select(int[] a, int left, int right, int k) {
1782 quickSelectAdaptive(a, left, right, k, k, new int[1], MODE_FR_SAMPLING);
1783 }
1784
1785 /**
1786 * Partition the array such that indices {@code k} correspond to their correctly
1787 * sorted value in the equivalent fully sorted array.
1788 *
1789 * <p>The count of the number of used indices is returned. If the keys are sorted in-place,
1790 * the count is returned as a negative.
1791 *
1792 * @param a Values.
1793 * @param left Lower bound of data (inclusive).
1794 * @param right Upper bound of data (inclusive).
1795 * @param k Indices (may be destructively modified).
1796 * @param n Count of indices.
1797 * @throws IndexOutOfBoundsException if any index {@code k} is not within the
1798 * sub-range {@code [left, right]}
1799 */
1800 static void select(int[] a, int left, int right, int[] k, int n) {
1801 if (n == 1) {
1802 quickSelectAdaptive(a, left, right, k[0], k[0], new int[1], MODE_FR_SAMPLING);
1803 return;
1804 }
1805
1806 // Interval creation validates the indices are in [left, right]
1807 final UpdatingInterval keys = IndexSupport.createUpdatingInterval(left, right, k, n);
1808
1809 // Note: If the keys are not separated then they are effectively a single key.
1810 // Any split of keys separated by the sort select size
1811 // will be finished on the next iteration.
1812 final int k1 = keys.left();
1813 final int kn = keys.right();
1814 if (kn - k1 < DP_SORTSELECT_SIZE) {
1815 quickSelectAdaptive(a, left, right, k1, kn, new int[1], MODE_FR_SAMPLING);
1816 } else {
1817 // Dual-pivot mode with small range sort length configured using index density
1818 dualPivotQuickSelect(a, left, right, keys, dualPivotFlags(left, right, k1, kn));
1819 }
1820 }
1821
1822 /**
1823 * Partition the array such that indices {@code k} correspond to their
1824 * correctly sorted value in the equivalent fully sorted array.
1825 *
1826 * <p>For all indices {@code [ka, kb]} and any index {@code i}:
1827 *
1828 * <pre>{@code
1829 * data[i < ka] <= data[ka] <= data[kb] <= data[kb < i]
1830 * }</pre>
1831 *
1832 * <p>This function accepts indices {@code [ka, kb]} that define the
1833 * range of indices to partition. It is expected that the range is small.
1834 *
1835 * <p>The {@code flags} are used to control the sampling mode and adaption of
1836 * the index within the sample.
1837 *
1838 * <p>Returns the bounds containing {@code [ka, kb]}. These may be lower/higher
1839 * than the keys if equal values are present in the data.
1840 *
1841 * @param a Values.
1842 * @param left Lower bound of data (inclusive, assumed to be strictly positive).
1843 * @param right Upper bound of data (inclusive, assumed to be strictly positive).
1844 * @param ka First key of interest.
1845 * @param kb Last key of interest.
1846 * @param bounds Upper bound of the range containing {@code [ka, kb]} (inclusive).
1847 * @param flags Adaption flags.
1848 * @return Lower bound of the range containing {@code [ka, kb]} (inclusive).
1849 */
1850 static int quickSelectAdaptive(int[] a, int left, int right, int ka, int kb,
1851 int[] bounds, int flags) {
1852 int l = left;
1853 int r = right;
1854 int m = flags;
1855 while (true) {
1856 // Select when ka and kb are close to the same end
1857 // |l|-----|ka|kkkkkkkk|kb|------|r|
1858 if (Math.min(kb - l, r - ka) < LINEAR_SORTSELECT_SIZE) {
1859 sortSelect(a, l, r, ka, kb);
1860 bounds[0] = kb;
1861 return ka;
1862 }
1863
1864 // Only target ka; kb is assumed to be close
1865 int p0;
1866 final int n = r - l;
1867 // f in [0, 1]
1868 final double f = (double) (ka - l) / n;
1869 // Record the larger margin (start at 1/4) to create the estimated size.
1870 // step L R
1871 // far left 1/12 1/3 (use 1/4 + 1/32 + 1/64 ~ 0.328)
1872 // left 1/6 1/4
1873 // middle 2/9 2/9 (use 1/4 - 1/32 ~ 0.219)
1874 int margin = n >> 2;
1875 if (m < MODE_SAMPLING && r - l > FR_SAMPLING_SIZE) {
1876 // Floyd-Rivest sample step uses the same margins
1877 p0 = sampleStep(a, l, r, ka, bounds);
1878 if (f <= STEP_FAR_LEFT || f >= STEP_FAR_RIGHT) {
1879 margin += (n >> 5) + (n >> 6);
1880 } else if (f > STEP_LEFT && f < STEP_RIGHT) {
1881 margin -= n >> 5;
1882 }
1883 } else if (f <= STEP_LEFT) {
1884 if (f <= STEP_FAR_LEFT) {
1885 margin += (n >> 5) + (n >> 6);
1886 p0 = repeatedStepFarLeft(a, l, r, ka, bounds, m);
1887 } else {
1888 p0 = repeatedStepLeft(a, l, r, ka, bounds, m);
1889 }
1890 } else if (f >= STEP_RIGHT) {
1891 if (f >= STEP_FAR_RIGHT) {
1892 margin += (n >> 5) + (n >> 6);
1893 p0 = repeatedStepFarRight(a, l, r, ka, bounds, m);
1894 } else {
1895 p0 = repeatedStepRight(a, l, r, ka, bounds, m);
1896 }
1897 } else {
1898 margin -= n >> 5;
1899 p0 = repeatedStep(a, l, r, ka, bounds, m);
1900 }
1901
1902 // Note: Here we expect [ka, kb] to be small and splitting is unlikely.
1903 // p0 p1
1904 // |l|--|ka|kkkk|kb|--|P|-------------------|r|
1905 // |l|----------------|P|--|ka|kkk|kb|------|r|
1906 // |l|-----------|ka|k|P|k|kb|--------------|r|
1907 final int p1 = bounds[0];
1908 if (kb < p0) {
1909 // Entirely on left side
1910 r = p0 - 1;
1911 } else if (ka > p1) {
1912 // Entirely on right side
1913 l = p1 + 1;
1914 } else {
1915 // Pivot splits [ka, kb]. Expect ends to be close to the pivot and finish.
1916 // Here we set the bounds for use after median-of-medians pivot selection.
1917 // In the event there are many equal values this allows collecting those
1918 // known to be equal together when moving around the medians sample.
1919 if (kb > p1) {
1920 sortSelectLeft(a, p1 + 1, r, kb);
1921 bounds[0] = kb;
1922 }
1923 if (ka < p0) {
1924 sortSelectRight(a, l, p0 - 1, ka);
1925 p0 = ka;
1926 }
1927 return p0;
1928 }
1929 // Update mode based on target partition size
1930 if (r - l > n - margin) {
1931 m++;
1932 }
1933 }
1934 }
1935
1936 /**
1937 * Partition an array slice around a pivot. Partitioning exchanges array elements such
1938 * that all elements smaller than pivot are before it and all elements larger than
1939 * pivot are after it.
1940 *
1941 * <p>Partitions a Floyd-Rivest sample around a pivot offset so that the input {@code k} will
1942 * fall in the smaller partition when the entire range is partitioned.
1943 *
1944 * <p>Assumes the range {@code r - l} is large.
1945 *
1946 * @param a Data array.
1947 * @param l Lower bound (inclusive).
1948 * @param r Upper bound (inclusive).
1949 * @param k Target index.
1950 * @param upper Upper bound (inclusive) of the pivot range.
1951 * @return Lower bound (inclusive) of the pivot range.
1952 */
1953 private static int sampleStep(int[] a, int l, int r, int k, int[] upper) {
1954 // Floyd-Rivest: use SELECT recursively on a sample of size S to get an estimate
1955 // for the (k-l+1)-th smallest element into a[k], biased slightly so that the
1956 // (k-l+1)-th element is expected to lie in the smaller set after partitioning.
1957 final int n = r - l + 1;
1958 final int ith = k - l + 1;
1959 final double z = Math.log(n);
1960 // sample size = 0.5 * n^(2/3)
1961 final double s = 0.5 * Math.exp(0.6666666666666666 * z);
1962 final double sd = 0.5 * Math.sqrt(z * s * (n - s) / n) * Integer.signum(ith - (n >> 1));
1963 final int ll = Math.max(l, (int) (k - ith * s / n + sd));
1964 final int rr = Math.min(r, (int) (k + (n - ith) * s / n + sd));
1965 // Sample recursion restarts from [ll, rr]
1966 final int p = quickSelectAdaptive(a, ll, rr, k, k, upper, MODE_FR_SAMPLING);
1967 return expandPartition(a, l, r, ll, rr, p, upper[0], upper);
1968 }
1969
1970 /**
1971 * Partition an array slice around a pivot. Partitioning exchanges array elements such
1972 * that all elements smaller than pivot are before it and all elements larger than
1973 * pivot are after it.
1974 *
1975 * <p>Assumes the range {@code r - l >= 8}; the caller is responsible for selection on a smaller
1976 * range. If using a 12th-tile for sampling then assumes {@code r - l >= 11}.
1977 *
1978 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm
1979 * with the median of 3 then median of 3; the final sample is placed in the
1980 * 5th 9th-tile; the pivot chosen from the sample is adaptive using the input {@code k}.
1981 *
1982 * <p>Given a pivot in the middle of the sample this has margins of 2/9 and 2/9.
1983 *
1984 * @param a Data array.
1985 * @param l Lower bound (inclusive).
1986 * @param r Upper bound (inclusive).
1987 * @param k Target index.
1988 * @param upper Upper bound (inclusive) of the pivot range.
1989 * @param flags Control flags.
1990 * @return Lower bound (inclusive) of the pivot range.
1991 */
1992 private static int repeatedStep(int[] a, int l, int r, int k, int[] upper, int flags) {
1993 // Adapted from Alexandrescu (2016), algorithm 8.
1994 final int fp;
1995 final int s;
1996 int p;
1997 if (flags <= MODE_SAMPLING) {
1998 // Median into a 12th-tile
1999 fp = (r - l + 1) / 12;
2000 // Position the sample around the target k
2001 s = k - mapDistance(k - l, l, r, fp);
2002 p = k;
2003 } else {
2004 // i in tertile [3f':6f')
2005 fp = (r - l + 1) / 9;
2006 final int f3 = 3 * fp;
2007 final int end = l + (f3 << 1);
2008 for (int i = l + f3; i < end; i++) {
2009 Sorting.sort3(a, i - f3, i, i + f3);
2010 }
2011 // 5th 9th-tile: [4f':5f')
2012 s = l + (fp << 2);
2013 // No adaption uses the middle to enforce strict margins
2014 p = s + (flags == MODE_ADAPTION ? mapDistance(k - l, l, r, fp) : (fp >>> 1));
2015 }
2016 final int e = s + fp - 1;
2017 for (int i = s; i <= e; i++) {
2018 Sorting.sort3(a, i - fp, i, i + fp);
2019 }
2020 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING);
2021 return expandPartition(a, l, r, s, e, p, upper[0], upper);
2022 }
2023
2024 /**
2025 * Partition an array slice around a pivot. Partitioning exchanges array elements such
2026 * that all elements smaller than pivot are before it and all elements larger than
2027 * pivot are after it.
2028 *
2029 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller
2030 * range.
2031 *
2032 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm
2033 * with the lower median of 4 then either median of 3 with the final sample placed in the
2034 * 5th 12th-tile, or min of 3 with the final sample in the 4th 12th-tile;
2035 * the pivot chosen from the sample is adaptive using the input {@code k}.
2036 *
2037 * <p>Given a pivot in the middle of the sample this has margins of 1/6 and 1/4.
2038 *
2039 * @param a Data array.
2040 * @param l Lower bound (inclusive).
2041 * @param r Upper bound (inclusive).
2042 * @param k Target index.
2043 * @param upper Upper bound (inclusive) of the pivot range.
2044 * @param flags Control flags.
2045 * @return Lower bound (inclusive) of the pivot range.
2046 */
2047 private static int repeatedStepLeft(int[] a, int l, int r, int k, int[] upper, int flags) {
2048 // Adapted from Alexandrescu (2016), algorithm 9.
2049 final int fp;
2050 final int s;
2051 int p;
2052 if (flags <= MODE_SAMPLING) {
2053 // Median into a 12th-tile
2054 fp = (r - l + 1) / 12;
2055 // Position the sample around the target k
2056 // Avoid bounds error due to rounding as (k-l)/(r-l) -> 1/12
2057 s = Math.max(k - mapDistance(k - l, l, r, fp), l + fp);
2058 p = k;
2059 } else {
2060 // i in 2nd quartile
2061 final int f = (r - l + 1) >> 2;
2062 final int f2 = f + f;
2063 final int end = l + f2;
2064 for (int i = l + f; i < end; i++) {
2065 Sorting.lowerMedian4(a, i - f, i, i + f, i + f2);
2066 }
2067 // i in 5th 12th-tile
2068 fp = f / 3;
2069 s = l + f + fp;
2070 // No adaption uses the middle to enforce strict margins
2071 p = s + (flags == MODE_ADAPTION ? mapDistance(k - l, l, r, fp) : (fp >>> 1));
2072 }
2073 final int e = s + fp - 1;
2074 for (int i = s; i <= e; i++) {
2075 Sorting.sort3(a, i - fp, i, i + fp);
2076 }
2077 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING);
2078 return expandPartition(a, l, r, s, e, p, upper[0], upper);
2079 }
2080
2081 /**
2082 * Partition an array slice around a pivot. Partitioning exchanges array elements such
2083 * that all elements smaller than pivot are before it and all elements larger than
2084 * pivot are after it.
2085 *
2086 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller
2087 * range.
2088 *
2089 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm
2090 * with the upper median of 4 then either median of 3 with the final sample placed in the
2091 * 8th 12th-tile, or max of 3 with the final sample in the 9th 12th-tile;
2092 * the pivot chosen from the sample is adaptive using the input {@code k}.
2093 *
2094 * <p>Given a pivot in the middle of the sample this has margins of 1/4 and 1/6.
2095 *
2096 * @param a Data array.
2097 * @param l Lower bound (inclusive).
2098 * @param r Upper bound (inclusive).
2099 * @param k Target index.
2100 * @param upper Upper bound (inclusive) of the pivot range.
2101 * @param flags Control flags.
2102 * @return Lower bound (inclusive) of the pivot range.
2103 */
2104 private static int repeatedStepRight(int[] a, int l, int r, int k, int[] upper, int flags) {
2105 // Mirror image repeatedStepLeft using upper median into 3rd quartile
2106 final int fp;
2107 final int e;
2108 int p;
2109 if (flags <= MODE_SAMPLING) {
2110 // Median into a 12th-tile
2111 fp = (r - l + 1) / 12;
2112 // Position the sample around the target k
2113 // Avoid bounds error due to rounding as (r-k)/(r-l) -> 11/12
2114 e = Math.min(k + mapDistance(r - k, l, r, fp), r - fp);
2115 p = k;
2116 } else {
2117 // i in 3rd quartile
2118 final int f = (r - l + 1) >> 2;
2119 final int f2 = f + f;
2120 final int end = r - f2;
2121 for (int i = r - f; i > end; i--) {
2122 Sorting.upperMedian4(a, i - f2, i - f, i, i + f);
2123 }
2124 // i in 8th 12th-tile
2125 fp = f / 3;
2126 e = r - f - fp;
2127 // No adaption uses the middle to enforce strict margins
2128 p = e - (flags == MODE_ADAPTION ? mapDistance(r - k, l, r, fp) : (fp >>> 1));
2129 }
2130 final int s = e - fp + 1;
2131 for (int i = s; i <= e; i++) {
2132 Sorting.sort3(a, i - fp, i, i + fp);
2133 }
2134 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING);
2135 return expandPartition(a, l, r, s, e, p, upper[0], upper);
2136 }
2137
2138 /**
2139 * Partition an array slice around a pivot. Partitioning exchanges array elements such
2140 * that all elements smaller than pivot are before it and all elements larger than
2141 * pivot are after it.
2142 *
2143 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller
2144 * range.
2145 *
2146 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm
2147 * with the minimum of 4 then median of 3; the final sample is placed in the
2148 * 2nd 12th-tile; the pivot chosen from the sample is adaptive using the input {@code k}.
2149 *
2150 * <p>Given a pivot in the middle of the sample this has margins of 1/12 and 1/3.
2151 *
2152 * @param a Data array.
2153 * @param l Lower bound (inclusive).
2154 * @param r Upper bound (inclusive).
2155 * @param k Target index.
2156 * @param upper Upper bound (inclusive) of the pivot range.
2157 * @param flags Control flags.
2158 * @return Lower bound (inclusive) of the pivot range.
2159 */
2160 private static int repeatedStepFarLeft(int[] a, int l, int r, int k, int[] upper, int flags) {
2161 // Far step has been changed from the Alexandrescu (2016) step of lower-median-of-4, min-of-3
2162 // into the 4th 12th-tile to a min-of-4, median-of-3 into the 2nd 12th-tile.
2163 // The differences are:
2164 // - The upper margin when not sampling is 8/24 vs. 9/24; the lower margin remains at 1/12.
2165 // - The position of the sample is closer to the expected location of k < |A| / 12.
2166 // - Sampling mode uses a median-of-3 with adaptive k, matching the other step methods.
2167 // A min-of-3 sample can create a pivot too small if used with adaption of k leaving
2168 // k in the larger parition and a wasted iteration.
2169 // - Adaption is adjusted to force use of the lower margin when not sampling.
2170 final int fp;
2171 final int s;
2172 int p;
2173 if (flags <= MODE_SAMPLING) {
2174 // 2nd 12th-tile
2175 fp = (r - l + 1) / 12;
2176 s = l + fp;
2177 // Use adaption
2178 p = s + mapDistance(k - l, l, r, fp);
2179 } else {
2180 // i in 2nd quartile; min into i-f (1st quartile)
2181 final int f = (r - l + 1) >> 2;
2182 final int f2 = f + f;
2183 final int end = l + f2;
2184 for (int i = l + f; i < end; i++) {
2185 if (a[i + f] < a[i - f]) {
2186 final int u = a[i + f];
2187 a[i + f] = a[i - f];
2188 a[i - f] = u;
2189 }
2190 if (a[i + f2] < a[i]) {
2191 final int v = a[i + f2];
2192 a[i + f2] = a[i];
2193 a[i] = v;
2194 }
2195 if (a[i] < a[i - f]) {
2196 final int u = a[i];
2197 a[i] = a[i - f];
2198 a[i - f] = u;
2199 }
2200 }
2201 // 2nd 12th-tile
2202 fp = f / 3;
2203 s = l + fp;
2204 // Lower margin has 2(d+1) elements; d == (position in sample) - s
2205 // Force k into the lower margin
2206 p = s + ((k - l) >>> 1);
2207 }
2208 final int e = s + fp - 1;
2209 for (int i = s; i <= e; i++) {
2210 Sorting.sort3(a, i - fp, i, i + fp);
2211 }
2212 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING);
2213 return expandPartition(a, l, r, s, e, p, upper[0], upper);
2214 }
2215
2216 /**
2217 * Partition an array slice around a pivot. Partitioning exchanges array elements such
2218 * that all elements smaller than pivot are before it and all elements larger than
2219 * pivot are after it.
2220 *
2221 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller
2222 * range.
2223 *
2224 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm
2225 * with the maximum of 4 then median of 3; the final sample is placed in the
2226 * 11th 12th-tile; the pivot chosen from the sample is adaptive using the input {@code k}.
2227 *
2228 * <p>Given a pivot in the middle of the sample this has margins of 1/3 and 1/12.
2229 *
2230 * @param a Data array.
2231 * @param l Lower bound (inclusive).
2232 * @param r Upper bound (inclusive).
2233 * @param k Target index.
2234 * @param upper Upper bound (inclusive) of the pivot range.
2235 * @param flags Control flags.
2236 * @return Lower bound (inclusive) of the pivot range.
2237 */
2238 private static int repeatedStepFarRight(int[] a, int l, int r, int k, int[] upper, int flags) {
2239 // Mirror image repeatedStepFarLeft
2240 final int fp;
2241 final int e;
2242 int p;
2243 if (flags <= MODE_SAMPLING) {
2244 // 11th 12th-tile
2245 fp = (r - l + 1) / 12;
2246 e = r - fp;
2247 // Use adaption
2248 p = e - mapDistance(r - k, l, r, fp);
2249 } else {
2250 // i in 3rd quartile; max into i+f (4th quartile)
2251 final int f = (r - l + 1) >> 2;
2252 final int f2 = f + f;
2253 final int end = r - f2;
2254 for (int i = r - f; i > end; i--) {
2255 if (a[i - f] > a[i + f]) {
2256 final int u = a[i - f];
2257 a[i - f] = a[i + f];
2258 a[i + f] = u;
2259 }
2260 if (a[i - f2] > a[i]) {
2261 final int v = a[i - f2];
2262 a[i - f2] = a[i];
2263 a[i] = v;
2264 }
2265 if (a[i] > a[i + f]) {
2266 final int u = a[i];
2267 a[i] = a[i + f];
2268 a[i + f] = u;
2269 }
2270 }
2271 // 11th 12th-tile
2272 fp = f / 3;
2273 e = r - fp;
2274 // Upper margin has 2(d+1) elements; d == e - (position in sample)
2275 // Force k into the upper margin
2276 p = e - ((r - k) >>> 1);
2277 }
2278 final int s = e - fp + 1;
2279 for (int i = s; i <= e; i++) {
2280 Sorting.sort3(a, i - fp, i, i + fp);
2281 }
2282 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING);
2283 return expandPartition(a, l, r, s, e, p, upper[0], upper);
2284 }
2285
2286 /**
2287 * Expand a partition around a single pivot. Partitioning exchanges array
2288 * elements such that all elements smaller than pivot are before it and all
2289 * elements larger than pivot are after it. The central region is already
2290 * partitioned.
2291 *
2292 * <pre>{@code
2293 * |l |s |p0 p1| e| r|
2294 * | ??? | <P | ==P | >P | ??? |
2295 * }</pre>
2296 *
2297 * <p>This requires that {@code start != end}. However it handles
2298 * {@code left == start} and/or {@code end == right}.
2299 *
2300 * @param a Data array.
2301 * @param left Lower bound (inclusive).
2302 * @param right Upper bound (inclusive).
2303 * @param start Start of the partition range (inclusive).
2304 * @param end End of the partitioned range (inclusive).
2305 * @param pivot0 Lower pivot location (inclusive).
2306 * @param pivot1 Upper pivot location (inclusive).
2307 * @param upper Upper bound (inclusive) of the pivot range [k1].
2308 * @return Lower bound (inclusive) of the pivot range [k0].
2309 */
2310 // package-private for testing
2311 static int expandPartition(int[] a, int left, int right, int start, int end,
2312 int pivot0, int pivot1, int[] upper) {
2313 // 3-way partition of the data using a pivot value into
2314 // less-than, equal or greater-than.
2315 // Based on Sedgewick's Bentley-McIroy partitioning: always swap i<->j then
2316 // check for equal to the pivot and move again.
2317 //
2318 // Move sentinels from start and end to left and right. Scan towards the
2319 // sentinels until >=,<=. Swap then move == to the pivot region.
2320 // <-i j->
2321 // |l | | |p0 p1| | | r|
2322 // |>=| ??? | < | == | > | ??? |<=|
2323 //
2324 // When either i or j reach the edge perform finishing loop.
2325 // Finish loop for a[j] <= v replaces j with p1+1, optionally moves value
2326 // to p0 for < and updates the pivot range p1 (and optionally p0):
2327 // j->
2328 // |l |p0 p1| | | r|
2329 // | < | == | > | ??? |<=|
2330
2331 final int v = a[pivot0];
2332 // Use start/end as sentinels (requires start != end)
2333 int vi = a[start];
2334 int vj = a[end];
2335 a[start] = a[left];
2336 a[end] = a[right];
2337 a[left] = vj;
2338 a[right] = vi;
2339
2340 int i = start + 1;
2341 int j = end - 1;
2342
2343 // Positioned for pre-in/decrement to write to pivot region
2344 int p0 = pivot0 == start ? i : pivot0;
2345 int p1 = pivot1 == end ? j : pivot1;
2346
2347 while (true) {
2348 do {
2349 --i;
2350 } while (a[i] < v);
2351 do {
2352 ++j;
2353 } while (a[j] > v);
2354 vj = a[i];
2355 vi = a[j];
2356 a[i] = vi;
2357 a[j] = vj;
2358 // Move the equal values to pivot region
2359 if (vi == v) {
2360 a[i] = a[--p0];
2361 a[p0] = v;
2362 }
2363 if (vj == v) {
2364 a[j] = a[++p1];
2365 a[p1] = v;
2366 }
2367 // Termination check and finishing loops.
2368 // Note: This works even if pivot region is zero length (p1 == p0-1 due to
2369 // length 1 pivot region at either start/end) because we pre-inc/decrement
2370 // one side and post-inc/decrement the other side.
2371 if (i == left) {
2372 while (j < right) {
2373 do {
2374 ++j;
2375 } while (a[j] > v);
2376 final int w = a[j];
2377 // Move upper bound of pivot region
2378 a[j] = a[++p1];
2379 a[p1] = v;
2380 // Move lower bound of pivot region
2381 if (w != v) {
2382 a[p0] = w;
2383 p0++;
2384 }
2385 }
2386 break;
2387 }
2388 if (j == right) {
2389 while (i > left) {
2390 do {
2391 --i;
2392 } while (a[i] < v);
2393 final int w = a[i];
2394 // Move lower bound of pivot region
2395 a[i] = a[--p0];
2396 a[p0] = v;
2397 // Move upper bound of pivot region
2398 if (w != v) {
2399 a[p1] = w;
2400 p1--;
2401 }
2402 }
2403 break;
2404 }
2405 }
2406
2407 upper[0] = p1;
2408 return p0;
2409 }
2410
2411 /**
2412 * Partition the array such that indices {@code k} correspond to their
2413 * correctly sorted value in the equivalent fully sorted array.
2414 *
2415 * <p>For all indices {@code k} and any index {@code i}:
2416 *
2417 * <pre>{@code
2418 * data[i < k] <= data[k] <= data[k < i]
2419 * }</pre>
2420 *
2421 * <p>This function accepts a {@link UpdatingInterval} of indices {@code k} that define the
2422 * range of indices to partition. The {@link UpdatingInterval} can be narrowed or split as
2423 * partitioning divides the range.
2424 *
2425 * <p>Uses an introselect variant. The quickselect is a dual-pivot quicksort
2426 * partition method by Vladimir Yaroslavskiy; the fall-back on poor convergence of
2427 * the quickselect is a heapselect.
2428 *
2429 * <p>The {@code flags} contain the the current recursion count and the configured
2430 * length threshold for {@code r - l} to perform sort select. The count is in the upper
2431 * bits and the threshold is in the lower bits.
2432 *
2433 * @param a Values.
2434 * @param left Lower bound of data (inclusive, assumed to be strictly positive).
2435 * @param right Upper bound of data (inclusive, assumed to be strictly positive).
2436 * @param k Interval of indices to partition (ordered).
2437 * @param flags Control flags.
2438 */
2439 // package-private for testing
2440 static void dualPivotQuickSelect(int[] a, int left, int right, UpdatingInterval k, int flags) {
2441 // If partitioning splits the interval then recursion is used for the left-most side(s)
2442 // and the right-most side remains within this function. If partitioning does
2443 // not split the interval then it remains within this function.
2444 int l = left;
2445 int r = right;
2446 int f = flags;
2447 int ka = k.left();
2448 int kb = k.right();
2449 final int[] upper = {0, 0, 0};
2450 while (true) {
2451 // Select when ka and kb are close to the same end,
2452 // or the entire range is small
2453 // |l|-----|ka|--------|kb|------|r|
2454 final int n = r - l;
2455 if (Math.min(kb - l, r - ka) < DP_SORTSELECT_SIZE ||
2456 n < (f & SORTSELECT_MASK)) {
2457 sortSelect(a, l, r, ka, kb);
2458 return;
2459 }
2460 if (kb - ka < DP_SORTSELECT_SIZE) {
2461 // Switch to single-pivot mode with Floyd-Rivest sub-sampling
2462 quickSelectAdaptive(a, l, r, ka, kb, upper, MODE_FR_SAMPLING);
2463 return;
2464 }
2465 if (f < 0) {
2466 // Excess recursion, switch to heap select
2467 heapSelect(a, l, r, ka, kb);
2468 return;
2469 }
2470
2471 // Dual-pivot partitioning
2472 final int p0 = partition(a, l, r, upper);
2473 final int p1 = upper[0];
2474
2475 // Recursion to max depth
2476 // Note: Here we possibly branch left, middle and right with multiple keys.
2477 // It is possible that the partition has split the keys
2478 // and the recursion proceeds with a reduced set in each region.
2479 // p0 p1 p2 p3
2480 // |l|--|ka|--k----k--|P|------k--|kb|----|P|----|r|
2481 // kb | ka
2482 f += RECURSION_INCREMENT;
2483 // Recurse left side if required
2484 if (ka < p0) {
2485 if (kb <= p1) {
2486 // Entirely on left side
2487 r = p0 - 1;
2488 if (r < kb) {
2489 kb = k.updateRight(r);
2490 }
2491 continue;
2492 }
2493 dualPivotQuickSelect(a, l, p0 - 1, k.splitLeft(p0, p1), f);
2494 // Here we must process middle and/or right
2495 ka = k.left();
2496 } else if (kb <= p1) {
2497 // No middle/right side
2498 return;
2499 } else if (ka <= p1) {
2500 // Advance lower bound
2501 ka = k.updateLeft(p1 + 1);
2502 }
2503 // Recurse middle if required
2504 final int p2 = upper[1];
2505 final int p3 = upper[2];
2506 if (ka < p2) {
2507 l = p1 + 1;
2508 if (kb <= p3) {
2509 // Entirely in middle
2510 r = p2 - 1;
2511 if (r < kb) {
2512 kb = k.updateRight(r);
2513 }
2514 continue;
2515 }
2516 dualPivotQuickSelect(a, l, p2 - 1, k.splitLeft(p2, p3), f);
2517 ka = k.left();
2518 } else if (kb <= p3) {
2519 // No right side
2520 return;
2521 } else if (ka <= p3) {
2522 ka = k.updateLeft(p3 + 1);
2523 }
2524 // Continue right
2525 l = p3 + 1;
2526 }
2527 }
2528
2529 /**
2530 * Partition an array slice around 2 pivots. Partitioning exchanges array elements
2531 * such that all elements smaller than pivot are before it and all elements larger
2532 * than pivot are after it.
2533 *
2534 * <p>This method returns 4 points describing the pivot ranges of equal values.
2535 *
2536 * <pre>{@code
2537 * |k0 k1| |k2 k3|
2538 * | <P | ==P1 | <P1 && <P2 | ==P2 | >P |
2539 * }</pre>
2540 *
2541 * <ul>
2542 * <li>k0: lower pivot1 point
2543 * <li>k1: upper pivot1 point (inclusive)
2544 * <li>k2: lower pivot2 point
2545 * <li>k3: upper pivot2 point (inclusive)
2546 * </ul>
2547 *
2548 * <p>Bounds are set so {@code i < k0}, {@code i > k3} and {@code k1 < i < k2} are
2549 * unsorted. When the range {@code [k0, k3]} contains fully sorted elements the result
2550 * is set to {@code k1 = k3; k2 == k0}. This can occur if
2551 * {@code P1 == P2} or there are zero or one value between the pivots
2552 * {@code P1 < v < P2}. Any sort/partition of ranges [left, k0-1], [k1+1, k2-1] and
2553 * [k3+1, right] must check the length is {@code > 1}.
2554 *
2555 * @param a Data array.
2556 * @param left Lower bound (inclusive).
2557 * @param right Upper bound (inclusive).
2558 * @param bounds Points [k1, k2, k3].
2559 * @return Lower bound (inclusive) of the pivot range [k0].
2560 */
2561 private static int partition(int[] a, int left, int right, int[] bounds) {
2562 // Pick 2 pivots from 5 approximately uniform through the range.
2563 // Spacing is ~ 1/7 made using shifts. Other strategies are equal or much
2564 // worse. 1/7 = 5/35 ~ 1/8 + 1/64 : 0.1429 ~ 0.1406
2565 // Ensure the value is above zero to choose different points!
2566 final int n = right - left;
2567 final int step = 1 + (n >>> 3) + (n >>> 6);
2568 final int i3 = left + (n >>> 1);
2569 final int i2 = i3 - step;
2570 final int i1 = i2 - step;
2571 final int i4 = i3 + step;
2572 final int i5 = i4 + step;
2573 Sorting.sort5(a, i1, i2, i3, i4, i5);
2574
2575 // Partition data using pivots P1 and P2 into less-than, greater-than or between.
2576 // Pivot values P1 & P2 are placed at the end. If P1 < P2, P2 acts as a sentinel.
2577 // k traverses the unknown region ??? and values moved if less-than or
2578 // greater-than:
2579 //
2580 // left less k great right
2581 // |P1| <P1 | P1 <= & <= P2 | ??? | >P2 |P2|
2582 //
2583 // <P1 (left, lt)
2584 // P1 <= & <= P2 [lt, k)
2585 // >P2 (gt, right)
2586 //
2587 // At the end pivots are swapped back to behind the less and great pointers.
2588 //
2589 // | <P1 |P1| P1<= & <= P2 |P2| >P2 |
2590
2591 // Swap ends to the pivot locations.
2592 final int v1 = a[i2];
2593 a[i2] = a[left];
2594 a[left] = v1;
2595 final int v2 = a[i4];
2596 a[i4] = a[right];
2597 a[right] = v2;
2598
2599 // pointers
2600 int less = left;
2601 int great = right;
2602
2603 // Fast-forward ascending / descending runs to reduce swaps.
2604 // Cannot overrun as end pivots (v1 <= v2) act as sentinels.
2605 do {
2606 ++less;
2607 } while (a[less] < v1);
2608 do {
2609 --great;
2610 } while (a[great] > v2);
2611
2612 // a[less - 1] < P1 : a[great + 1] > P2
2613 // unvisited in [less, great]
2614 SORTING:
2615 for (int k = less - 1; ++k <= great;) {
2616 final int v = a[k];
2617 if (v < v1) {
2618 // swap(a, k, less++)
2619 a[k] = a[less];
2620 a[less] = v;
2621 less++;
2622 } else if (v > v2) {
2623 // while k < great and a[great] > v2:
2624 // great--
2625 while (a[great] > v2) {
2626 if (great-- == k) {
2627 // Done
2628 break SORTING;
2629 }
2630 }
2631 // swap(a, k, great--)
2632 // if a[k] < v1:
2633 // swap(a, k, less++)
2634 final int w = a[great];
2635 a[great] = v;
2636 great--;
2637 // delay a[k] = w
2638 if (w < v1) {
2639 a[k] = a[less];
2640 a[less] = w;
2641 less++;
2642 } else {
2643 a[k] = w;
2644 }
2645 }
2646 }
2647
2648 // Change to inclusive ends : a[less] < P1 : a[great] > P2
2649 less--;
2650 great++;
2651 // Move the pivots to correct locations
2652 a[left] = a[less];
2653 a[less] = v1;
2654 a[right] = a[great];
2655 a[great] = v2;
2656
2657 // Record the pivot locations
2658 final int lower = less;
2659 bounds[2] = great;
2660
2661 // equal elements
2662 // Original paper: If middle partition is bigger than a threshold
2663 // then check for equal elements.
2664
2665 // Note: This is extra work. When performing partitioning the region of interest
2666 // may be entirely above or below the central region and this can be skipped.
2667
2668 // Here we look for equal elements if the centre is more than 5/8 the length.
2669 // 5/8 = 1/2 + 1/8. Pivots must be different.
2670 if ((great - less) > (n >>> 1) + (n >>> 3) && v1 != v2) {
2671
2672 // Fast-forward to reduce swaps. Changes inclusive ends to exclusive ends.
2673 // Since v1 != v2 these act as sentinels to prevent overrun.
2674 do {
2675 ++less;
2676 } while (a[less] == v1);
2677 do {
2678 --great;
2679 } while (a[great] == v2);
2680
2681 // This copies the logic in the sorting loop using == comparisons
2682 EQUAL:
2683 for (int k = less - 1; ++k <= great;) {
2684 final int v = a[k];
2685 if (v == v1) {
2686 a[k] = a[less];
2687 a[less] = v;
2688 less++;
2689 } else if (v == v2) {
2690 while (a[great] == v2) {
2691 if (great-- == k) {
2692 // Done
2693 break EQUAL;
2694 }
2695 }
2696 final int w = a[great];
2697 a[great] = v;
2698 great--;
2699 if (w == v1) {
2700 a[k] = a[less];
2701 a[less] = w;
2702 less++;
2703 } else {
2704 a[k] = w;
2705 }
2706 }
2707 }
2708
2709 // Change to inclusive ends
2710 less--;
2711 great++;
2712 }
2713
2714 // Between pivots in (less, great)
2715 if (v1 != v2 && less < great - 1) {
2716 // Record the pivot end points
2717 bounds[0] = less;
2718 bounds[1] = great;
2719 } else {
2720 // No unsorted internal region (set k1 = k3; k2 = k0)
2721 bounds[0] = bounds[2];
2722 bounds[1] = lower;
2723 }
2724
2725 return lower;
2726 }
2727
2728 /**
2729 * Map the distance from the edge of {@code [l, r]} to a new distance in {@code [0, n)}.
2730 *
2731 * <p>The provides the adaption {@code kf'/|A|} from Alexandrescu (2016) where
2732 * {@code k == d}, {@code f' == n} and {@code |A| == r-l+1}.
2733 *
2734 * <p>For convenience this accepts the input range {@code [l, r]}.
2735 *
2736 * @param d Distance from the edge in {@code [0, r - l]}.
2737 * @param l Lower bound (inclusive).
2738 * @param r Upper bound (inclusive).
2739 * @param n Size of the new range.
2740 * @return the mapped distance in [0, n)
2741 */
2742 private static int mapDistance(int d, int l, int r, int n) {
2743 return (int) (d * (n - 1.0) / (r - l));
2744 }
2745
2746 /**
2747 * Configure the dual-pivot control flags. This packs the maximum recursion depth and
2748 * sort select size into a single integer.
2749 *
2750 * @param left Lower bound (inclusive).
2751 * @param right Upper bound (inclusive).
2752 * @param k1 First key of interest.
2753 * @param kn Last key of interest.
2754 * @return the flags
2755 */
2756 private static int dualPivotFlags(int left, int right, int k1, int kn) {
2757 final int maxDepth = dualPivotMaxDepth(right - left);
2758 final int ss = dualPivotSortSelectSize(k1, kn);
2759 return dualPivotFlags(maxDepth, ss);
2760 }
2761
2762 /**
2763 * Configure the dual-pivot control flags. This packs the maximum recursion depth and
2764 * sort select size into a single integer.
2765 *
2766 * @param maxDepth Maximum recursion depth.
2767 * @param ss Sort select size.
2768 * @return the flags
2769 */
2770 static int dualPivotFlags(int maxDepth, int ss) {
2771 // The flags are packed using the upper bits to count back from -1 in
2772 // step sizes. The lower bits pack the sort select size.
2773 int flags = Integer.MIN_VALUE - maxDepth * RECURSION_INCREMENT;
2774 flags &= ~SORTSELECT_MASK;
2775 return flags | ss;
2776 }
2777
2778 /**
2779 * Compute the maximum recursion depth for dual pivot recursion.
2780 * This is an approximation to {@code 2 * log3 (x)}.
2781 *
2782 * <p>The result is between {@code 2*floor(log3(x))} and {@code 2*ceil(log3(x))}.
2783 * The result is correctly rounded when {@code x +/- 1} is a power of 3.
2784 *
2785 * @param x Value.
2786 * @return maximum recursion depth
2787 */
2788 static int dualPivotMaxDepth(int x) {
2789 // log3(2) ~ 1.5849625
2790 // log3(x) ~ log2(x) * 0.630929753... ~ log2(x) * 323 / 512 (0.630859375)
2791 // Use (floor(log2(x))+1) * 323 / 256
2792 return ((32 - Integer.numberOfLeadingZeros(x)) * 323) >>> 8;
2793 }
2794
2795 /**
2796 * Configure the sort select size for dual pivot partitioning.
2797 *
2798 * @param k1 First key of interest.
2799 * @param kn Last key of interest.
2800 * @return the sort select size.
2801 */
2802 private static int dualPivotSortSelectSize(int k1, int kn) {
2803 // Configure the sort select size based on the index density
2804 // l---k1---k---k-----k--k------kn----r
2805 //
2806 // For a full sort the dual-pivot quicksort can switch to insertion sort
2807 // when the length is small. The optimum value is dependent on the
2808 // hardware and the insertion sort implementation. Benchmarks show that
2809 // insertion sort can be used at length 80-120.
2810 //
2811 // During selection the SORTSELECT_SIZE specifies the distance from the edge
2812 // to use sort select. When keys are not dense there may be a small length
2813 // that is ignored by sort select due to the presence of another key.
2814 // Diagram of k-l = SORTSELECT_SIZE and r-k < SORTSELECT_SIZE where a second
2815 // key b is blocking the use of sort select. The key b is closest it can be to the right
2816 // key to enable blocking; it could be further away (up to k = left).
2817 //
2818 // |--SORTSELECT_SIZE--|
2819 // |--SORTSELECT_SIZE--|
2820 // l--b----------------k--r
2821 // l----b--------------k----r
2822 // l------b------------k------r
2823 // l--------b----------k--------r
2824 // l----------b--------k----------r
2825 // l------------b------k------------r
2826 // l--------------b----k--------------r
2827 // l----------------b--k----------------r
2828 // l------------------bk------------------r
2829 // |--SORTSELECT_SIZE--|
2830 //
2831 // For all these cases the partitioning method would have to run. Assuming ideal
2832 // dual-pivot partitioning into thirds, and that the left key is randomly positioned
2833 // in [left, k) it is more likely that after partitioning 2 partitions will have to
2834 // be processed rather than 1 partition. In this case the options are:
2835 // - split the range using partitioning; sort select next iteration
2836 // - use sort select with a edge distance above the optimum length for single k selection
2837 //
2838 // Contrast with a longer length:
2839 // |--SORTSELECT_SIZE--|
2840 // l-------------------k-----k-------k-------------------r
2841 // |--SORTSELECT_SIZE--|
2842 // Here partitioning has to run and 1, 2, or 3 partitions processed. But all k can
2843 // be found with a sort. In this case sort select could be used with a much higher
2844 // length (e.g. 80 - 120).
2845 //
2846 // When keys are extremely sparse (never within SORTSELECT_SIZE) then no switch
2847 // to sort select based on length is *required*. It may still be beneficial to avoid
2848 // partitioning if the length is very small due to the overhead of partitioning.
2849 //
2850 // Benchmarking with different lengths for a switch to sort select show inconsistent
2851 // behaviour across platforms due to the variable speed of insertion sort at longer
2852 // lengths. Attempts to transition the length based on various ramps schemes can
2853 // be incorrect and result is a slowdown rather than speed-up (if the correct threshold
2854 // is not chosen).
2855 //
2856 // Here we use a much simpler scheme based on these observations:
2857 // - If the average separation is very high then no length will collect extra indices
2858 // from a sort select over the current trigger of using the distance from the end. But
2859 // using a length dependence will not effect the work done by sort select as it only
2860 // performs the minimum sorting required.
2861 // - If the average separation is within the SORTSELECT_SIZE then a round of
2862 // partitioning will create multiple regions that all require a sort selection.
2863 // Thus a partitioning round can be avoided if the length is small.
2864 // - If the keys are at the end with nothing in between then partitioning will be able
2865 // to split them but a sort will have to sort the entire range:
2866 // lk-------------------------------kr
2867 // After partitioning starts the chance of keys being at the ends is low as keys
2868 // should be random within the divided range.
2869 // - Extremely high density keys is rare. It is only expected to saturate the range
2870 // with short lengths, e.g. 100 quantiles for length 1000 = separation 10 (high density)
2871 // but for length 10000 = separation 100 (low density).
2872 // - The density of (non-uniform) keys is hard to predict without complex analysis.
2873 //
2874 // Benchmarking using random keys at various density show no performance loss from
2875 // using a fixed size for the length dependence of sort select, if the size is small.
2876 // A large length can impact performance with low density keys, and on machines
2877 // where insertion sort is slower. Extreme performance gains occur when the average
2878 // separation of random keys is below 8-16, or of uniform keys around 32, by using a
2879 // sort at lengths up to 90. But this threshold shows performance loss greater than
2880 // the gains with separation of 64-128 on random keys, and on machines with slow
2881 // insertion sort. The transition to using an insertion sort of a longer length
2882 // is difficult to predict for all situations.
2883
2884 // Let partitioning run if the initial length is small.
2885 // Use kn - k1 as a proxy for the length. If length is actually very large then
2886 // the final selection is insignificant. This avoids slowdown for small lengths
2887 // where the keys may only be at the ends. Note ideal dual-pivot partitioning
2888 // creates thirds so 1 iteration on SORTSELECT_SIZE * 3 should create
2889 // SORTSELECT_SIZE partitions.
2890 if (kn - k1 < DP_SORTSELECT_SIZE * 3) {
2891 return 0;
2892 }
2893 // Here partitioning will run at least once.
2894 // Stable performance across platforms using a modest length dependence.
2895 return DP_SORTSELECT_SIZE * 2;
2896 }
2897 }