1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package org.apache.commons.numbers.arrays; 19 20 /** 21 * Partition array data. 22 * 23 * <p>Note: Requires that the floating-point data contains no NaN values; sorting does not 24 * respect the order of signed zeros imposed by {@link Double#compare(double, double)}; 25 * mixed signed zeros may be destroyed (the mixture updated during partitioning). The 26 * caller is responsible for counting a mixture of signed zeros and restoring them if 27 * required. 28 * 29 * @see Selection 30 * @since 1.2 31 */ 32 final class QuickSelect { 33 // Implementation Notes 34 // 35 // Selection is performed using a quickselect variant to recursively divide the range 36 // to select the target index, or indices. Partition sizes or recursion are monitored 37 // will fall-backs on poor convergence of a linearselect (single index) or heapselect. 38 // 39 // Many implementations were tested, each with strengths and weaknesses on different 40 // input data containing random elements, repeat elements, elements with repeat 41 // patterns, and constant elements. The final implementation performs well across data 42 // types for single and multiple indices with no obvious weakness. 43 // See: o.a.c.numbers.examples.jmh.arrays for benchmarking implementations. 44 // 45 // Single indices are selected using a quickselect adaptive method based on Alexandrescu. 46 // The algorithm is a quickselect around a pivot identified using a 47 // sample-of-sample-of-samples created from the entire range data. This pivot will 48 // have known lower and upper margins and ensures elimination of a minimum fraction of 49 // data at each step. To increase speed the pivot can be identified using less of the data 50 // but without margin guarantees (sampling mode). The algorithm monitors partition sizes 51 // against the known margins. If the reduction in the partition size is not large enough 52 // then the algorithm can disable sampling mode and ensure linear performance by removing 53 // a set fraction of the data each iteration. 54 // 55 // Modifications from Alexandrescu are: 56 // 1. Initialise sampling mode using the Floyd-Rivest (FR) SELECT algorithm. 57 // 2. Adaption is adjusted to force use of the lower margin in the far-step method when 58 // sampling is disabled. 59 // 3. Change the far-step method to a min-of-4 then median-of-3 into the 2nd 12th-tile. 60 // The original method uses a lower-median-of-4, min-of-3 into the 4th 12th-tile. 61 // 4. Position the sample around the target k when in sampling mode for the non-far-step 62 // methods. 63 // 64 // The far step method is used when target k is within 1/12 of the end of the data A. 65 // The differences in the far-step method are: 66 // - The upper margin when not sampling is 8/24 vs. 9/24; the lower margin remains at 1/12. 67 // - The position of the sample is closer to the expected location of k < |A|/12. 68 // - Sampling mode uses a median-of-3 with adaptive k, matching the other step methods. 69 // Note the original min-of-3 sample is more likely to create a pivot too small if used 70 // with adaption leaving k in the larger partition and a wasted iteration. 71 // 72 // The Floyd-Rivest (FR) SELECT algorithm is preferred for sampling over using quickselect 73 // adaptive sampling. It uses a smaller sample and has improved heuristics to place the sample 74 // pivot. However the FR sample is a small range of the data and pivot selection can be poor 75 // if the sample is not representative. This can be mitigated by creating a random sample 76 // of the entire range for the pivot selection. This implementation does not use random 77 // sampling for the FR mode. Performance is identical on random data (randomisation is a 78 // negligible overhead) and faster on sorted data. Any data not suitable for the FR algorithm 79 // are immediately switched to the quickselect adaptive algorithm with sampling. Performance 80 // across a range of data shows this strategy is approximately mid-way in performance between 81 // FR with random sampling, and quickselect adaptive in sampling mode. The benefit is that 82 // sorted or partially partitioned data are not intentionally unordered as the method will 83 // only move elements known to be incorrectly placed in the array. 84 // 85 // Multiple indices are selected using a dual-pivot partition method by 86 // Yaroslavskiy to divide the interval containing the indices. Recursion is performed for 87 // regions containing target indices. The method is thus a partial quicksort into regions of 88 // interest. Excess recursion triggers use of a heapselect. When indices are effectively 89 // a single index the method can switch to the single index selection to use the FR algorithm. 90 // 91 // Alternative schemes to partition multiple indices are to repeat call single index select 92 // with cached pivots, or without cached pivots if processing indices in order as the previous 93 // index brackets the range for the next search. Caching pivots is the most effective 94 // alternative. It requires storing all pivots during select, and using the cache to look-up 95 // the search bounds (sub-range) for each target index. This requires 2n searches for n indices. 96 // All pivots must be stored to avoid destroying previously partitioned data on repeat entry 97 // to the array. The current scheme inverts this by requiring at most n-1 divides of the 98 // indices during recursion and has the advantage of tracking recursion depth during selection 99 // for each sub-range. Division of indices is a small overhead for the common case where 100 // the number of indices is far smaller than the size of the data. 101 // 102 // Dual-pivot partitioning adapted from Yaroslavskiy 103 // http://codeblab.com/wp-content/uploads/2009/09/DualPivotQuicksort.pdf 104 // 105 // Modified to allow partial sorting (partitioning): 106 // - Ignore insertion sort for tiny array (handled by calling code). 107 // - Ignore recursive calls for a full sort (handled by calling code). 108 // - Change to fast-forward over initial ascending / descending runs. 109 // - Change to fast-forward great when v > v2 and either break the sorting 110 // loop, or move a[great] direct to the correct location. 111 // - Change to use the 2nd and 4th of 5 elements for the pivots. 112 // - Identify a large central region using ~5/8 of the length to trigger search for 113 // equal values. 114 // 115 // For some indices and data a full sort of the data will be faster; this is impossible to 116 // predict on unknown data input and attempts to analyse the indices and data impact 117 // performance for the majority of use cases where sorting is not a suitable choice. 118 // Use of the sortselect finisher allows the current multiple indices method to degrade 119 // to a (non-optimised) dual-pivot quicksort (see below). 120 // 121 // heapselect vs sortselect 122 // 123 // Quickselect can switch to an alternative when: the range is very small 124 // (e.g. insertion sort); or the target index is close to the end (e.g. heapselect). 125 // Small ranges and a target index close to the end are handled using a hybrid of insertion 126 // sort and selection (sortselect). This is faster than heapselect for small distance from 127 // the edge (m) for a single index and has the advantage of sorting all upstream values from 128 // the target index (heapselect requires push-down of each successive value to sort). This 129 // allows the dual-pivot quickselect on multiple indices that saturate the range to degrade 130 // to a (non-optimised) dual-pivot quicksort. However sortselect is worst case Order(m * (r-l)) 131 // for range [l, r] so cannot be used when quickselect fails to converge as m may be very large. 132 // Thus heapselect is used as the stopper algorithm when quickselect progress is slow on 133 // multiple indices. If heapselect is used for small range handling the performance on 134 // saturated indices is significantly slower. Hence the presence of two final selection 135 // methods for different purposes. 136 137 /** Sampling mode using Floyd-Rivest sampling. */ 138 static final int MODE_FR_SAMPLING = -1; 139 /** Sampling mode. */ 140 static final int MODE_SAMPLING = 0; 141 /** No sampling but use adaption of the target k. */ 142 static final int MODE_ADAPTION = 1; 143 /** No sampling and no adaption of target k (strict margins). */ 144 static final int MODE_STRICT = 2; 145 146 /** Minimum size for sortselect. 147 * Below this perform a sort rather than selection. This is used to avoid 148 * sort select on tiny data. */ 149 private static final int MIN_SORTSELECT_SIZE = 4; 150 /** Single-pivot sortselect size for quickselect adaptive. Note that quickselect adaptive 151 * recursively calls quickselect so very small lengths are included with an initial medium 152 * length. Using lengths of 1023-5 and 2043-53 indicate optimum performance around 20-30. 153 * Note: The expand partition function assumes a sample of at least length 2 as each end 154 * of the sample is used as a sentinel; this imposes a minimum length of 24 on the range 155 * to ensure it contains a 12-th tile of length 2. Thus the absolute minimum for the 156 * distance from the edge is 12. */ 157 private static final int LINEAR_SORTSELECT_SIZE = 24; 158 /** Dual-pivot sortselect size for the distance of a single k from the edge of the 159 * range length n. Benchmarking in range [81+81, 243+243] suggests a value of ~20 (or 160 * higher on some hardware). Ranges are chosen based on third interval spacing between 161 * powers of 3. 162 * 163 * <p>Sortselect is faster at this small size than heapselect. A second advantage is 164 * that all indices closer to the edge than the target index are also sorted. This 165 * allows selection of multiple close indices to be performed with effectively the 166 * same speed. High density indices will result in recursion to very short fragments 167 * which also trigger use of sort select. The threshold for sorting short lengths is 168 * configured in {@link #dualPivotSortSelectSize(int, int)}. */ 169 private static final int DP_SORTSELECT_SIZE = 20; 170 /** Threshold to use Floyd-Rivest sub-sampling. This partitions a sample of the data to 171 * identify a pivot so that the target element is in the smaller set after partitioning. 172 * The original FR paper used 600 otherwise reverted to the target index as the pivot. 173 * This implementation reverts to quickselect adaptive which increases robustness 174 * at small size on a variety of data and allows raising the original FR threshold. */ 175 private static final int FR_SAMPLING_SIZE = 1200; 176 177 /** Increment used for the recursion counter. The counter will overflow to negative when 178 * recursion has exceeded the maximum level. The counter is maintained in the upper bits 179 * of the dual-pivot control flags. */ 180 private static final int RECURSION_INCREMENT = 1 << 20; 181 /** Mask to extract the sort select size from the dual-pivot control flags. Currently 182 * the bits below those used for the recursion counter are only used for the sort select size 183 * so this can use a mask with all bits below the increment. */ 184 private static final int SORTSELECT_MASK = RECURSION_INCREMENT - 1; 185 186 /** Threshold to use repeated step left: 7 / 16. */ 187 private static final double STEP_LEFT = 0.4375; 188 /** Threshold to use repeated step right: 9 / 16. */ 189 private static final double STEP_RIGHT = 0.5625; 190 /** Threshold to use repeated step far-left: 1 / 12. */ 191 private static final double STEP_FAR_LEFT = 0.08333333333333333; 192 /** Threshold to use repeated step far-right: 11 / 12. */ 193 private static final double STEP_FAR_RIGHT = 0.9166666666666666; 194 195 /** No instances. */ 196 private QuickSelect() {} 197 198 /** 199 * Partition the elements between {@code ka} and {@code kb} using a heap select 200 * algorithm. It is assumed {@code left <= ka <= kb <= right}. 201 * 202 * @param a Data array to use to find out the K<sup>th</sup> value. 203 * @param left Lower bound (inclusive). 204 * @param right Upper bound (inclusive). 205 * @param ka Lower index to select. 206 * @param kb Upper index to select. 207 */ 208 static void heapSelect(double[] a, int left, int right, int ka, int kb) { 209 if (right <= left) { 210 return; 211 } 212 // Use the smallest heap 213 if (kb - left < right - ka) { 214 heapSelectLeft(a, left, right, ka, kb); 215 } else { 216 heapSelectRight(a, left, right, ka, kb); 217 } 218 } 219 220 /** 221 * Partition the elements between {@code ka} and {@code kb} using a heap select 222 * algorithm. It is assumed {@code left <= ka <= kb <= right}. 223 * 224 * <p>For best performance this should be called with {@code k} in the lower 225 * half of the range. 226 * 227 * @param a Data array to use to find out the K<sup>th</sup> value. 228 * @param left Lower bound (inclusive). 229 * @param right Upper bound (inclusive). 230 * @param ka Lower index to select. 231 * @param kb Upper index to select. 232 */ 233 static void heapSelectLeft(double[] a, int left, int right, int ka, int kb) { 234 // Create a max heap in-place in [left, k], rooted at a[left] = max 235 // |l|-max-heap-|k|--------------| 236 // Build the heap using Floyd's heap-construction algorithm for heap size n. 237 // Start at parent of the last element in the heap (k), 238 // i.e. start = parent(n-1) : parent(c) = floor((c - 1) / 2) : c = k - left 239 int end = kb + 1; 240 for (int p = left + ((kb - left - 1) >> 1); p >= left; p--) { 241 maxHeapSiftDown(a, a[p], p, left, end); 242 } 243 // Scan the remaining data and insert 244 // Mitigate worst case performance on descending data by backward sweep 245 double max = a[left]; 246 for (int i = right + 1; --i > kb;) { 247 final double v = a[i]; 248 if (v < max) { 249 a[i] = max; 250 maxHeapSiftDown(a, v, left, left, end); 251 max = a[left]; 252 } 253 } 254 // Partition [ka, kb] 255 // |l|-max-heap-|k|--------------| 256 // | <-swap-> | then sift down reduced size heap 257 // Avoid sifting heap of size 1 258 final int last = Math.max(left, ka - 1); 259 while (--end > last) { 260 maxHeapSiftDown(a, a[end], left, left, end); 261 a[end] = max; 262 max = a[left]; 263 } 264 } 265 266 /** 267 * Sift the element down the max heap. 268 * 269 * <p>Assumes {@code root <= p < end}, i.e. the max heap is above root. 270 * 271 * @param a Heap data. 272 * @param v Value to sift. 273 * @param p Start position. 274 * @param root Root of the heap. 275 * @param end End of the heap (exclusive). 276 */ 277 private static void maxHeapSiftDown(double[] a, double v, int p, int root, int end) { 278 // child2 = root + 2 * (parent - root) + 2 279 // = 2 * parent - root + 2 280 while (true) { 281 // Right child 282 int c = (p << 1) - root + 2; 283 if (c > end) { 284 // No left child 285 break; 286 } 287 // Use the left child if right doesn't exist, or it is greater 288 if (c == end || a[c] < a[c - 1]) { 289 --c; 290 } 291 if (v >= a[c]) { 292 // Parent greater than largest child - done 293 break; 294 } 295 // Swap and descend 296 a[p] = a[c]; 297 p = c; 298 } 299 a[p] = v; 300 } 301 302 /** 303 * Partition the elements between {@code ka} and {@code kb} using a heap select 304 * algorithm. It is assumed {@code left <= ka <= kb <= right}. 305 * 306 * <p>For best performance this should be called with {@code k} in the upper 307 * half of the range. 308 * 309 * @param a Data array to use to find out the K<sup>th</sup> value. 310 * @param left Lower bound (inclusive). 311 * @param right Upper bound (inclusive). 312 * @param ka Lower index to select. 313 * @param kb Upper index to select. 314 */ 315 static void heapSelectRight(double[] a, int left, int right, int ka, int kb) { 316 // Create a min heap in-place in [k, right], rooted at a[right] = min 317 // |--------------|k|-min-heap-|r| 318 // Build the heap using Floyd's heap-construction algorithm for heap size n. 319 // Start at parent of the last element in the heap (k), 320 // i.e. start = parent(n-1) : parent(c) = floor((c - 1) / 2) : c = right - k 321 int end = ka - 1; 322 for (int p = right - ((right - ka - 1) >> 1); p <= right; p++) { 323 minHeapSiftDown(a, a[p], p, right, end); 324 } 325 // Scan the remaining data and insert 326 // Mitigate worst case performance on descending data by backward sweep 327 double min = a[right]; 328 for (int i = left - 1; ++i < ka;) { 329 final double v = a[i]; 330 if (v > min) { 331 a[i] = min; 332 minHeapSiftDown(a, v, right, right, end); 333 min = a[right]; 334 } 335 } 336 // Partition [ka, kb] 337 // |--------------|k|-min-heap-|r| 338 // | <-swap-> | then sift down reduced size heap 339 // Avoid sifting heap of size 1 340 final int last = Math.min(right, kb + 1); 341 while (++end < last) { 342 minHeapSiftDown(a, a[end], right, right, end); 343 a[end] = min; 344 min = a[right]; 345 } 346 } 347 348 /** 349 * Sift the element down the min heap. 350 * 351 * <p>Assumes {@code root >= p > end}, i.e. the max heap is below root. 352 * 353 * @param a Heap data. 354 * @param v Value to sift. 355 * @param p Start position. 356 * @param root Root of the heap. 357 * @param end End of the heap (exclusive). 358 */ 359 private static void minHeapSiftDown(double[] a, double v, int p, int root, int end) { 360 // child2 = root - 2 * (root - parent) - 2 361 // = 2 * parent - root - 2 362 while (true) { 363 // Right child 364 int c = (p << 1) - root - 2; 365 if (c < end) { 366 // No left child 367 break; 368 } 369 // Use the left child if right doesn't exist, or it is less 370 if (c == end || a[c] > a[c + 1]) { 371 ++c; 372 } 373 if (v <= a[c]) { 374 // Parent less than smallest child - done 375 break; 376 } 377 // Swap and descend 378 a[p] = a[c]; 379 p = c; 380 } 381 a[p] = v; 382 } 383 384 /** 385 * Partition the elements between {@code ka} and {@code kb} using a sort select 386 * algorithm. It is assumed {@code left <= ka <= kb <= right}. 387 * 388 * @param a Data array to use to find out the K<sup>th</sup> value. 389 * @param left Lower bound (inclusive). 390 * @param right Upper bound (inclusive). 391 * @param ka Lower index to select. 392 * @param kb Upper index to select. 393 */ 394 static void sortSelect(double[] a, int left, int right, int ka, int kb) { 395 // Combine the test for right <= left with 396 // avoiding the overhead of sort select on tiny data. 397 if (right - left <= MIN_SORTSELECT_SIZE) { 398 Sorting.sort(a, left, right); 399 return; 400 } 401 // Sort the smallest side 402 if (kb - left < right - ka) { 403 sortSelectLeft(a, left, right, kb); 404 } else { 405 sortSelectRight(a, left, right, ka); 406 } 407 } 408 409 /** 410 * Partition the minimum {@code n} elements below {@code k} where 411 * {@code n = k - left + 1}. Uses an insertion sort algorithm. 412 * 413 * <p>Works with any {@code k} in the range {@code left <= k <= right} 414 * and performs a full sort of the range below {@code k}. 415 * 416 * <p>For best performance this should be called with 417 * {@code k - left < right - k}, i.e. 418 * to partition a value in the lower half of the range. 419 * 420 * @param a Data array to use to find out the K<sup>th</sup> value. 421 * @param left Lower bound (inclusive). 422 * @param right Upper bound (inclusive). 423 * @param k Index to select. 424 */ 425 static void sortSelectLeft(double[] a, int left, int right, int k) { 426 // Sort 427 for (int i = left; ++i <= k;) { 428 final double v = a[i]; 429 // Move preceding higher elements above (if required) 430 if (v < a[i - 1]) { 431 int j = i; 432 while (--j >= left && v < a[j]) { 433 a[j + 1] = a[j]; 434 } 435 a[j + 1] = v; 436 } 437 } 438 // Scan the remaining data and insert 439 // Mitigate worst case performance on descending data by backward sweep 440 double m = a[k]; 441 for (int i = right + 1; --i > k;) { 442 final double v = a[i]; 443 if (v < m) { 444 a[i] = m; 445 int j = k; 446 while (--j >= left && v < a[j]) { 447 a[j + 1] = a[j]; 448 } 449 a[j + 1] = v; 450 m = a[k]; 451 } 452 } 453 } 454 455 /** 456 * Partition the maximum {@code n} elements above {@code k} where 457 * {@code n = right - k + 1}. Uses an insertion sort algorithm. 458 * 459 * <p>Works with any {@code k} in the range {@code left <= k <= right} 460 * and can be used to perform a full sort of the range above {@code k}. 461 * 462 * <p>For best performance this should be called with 463 * {@code k - left > right - k}, i.e. 464 * to partition a value in the upper half of the range. 465 * 466 * @param a Data array to use to find out the K<sup>th</sup> value. 467 * @param left Lower bound (inclusive). 468 * @param right Upper bound (inclusive). 469 * @param k Index to select. 470 */ 471 static void sortSelectRight(double[] a, int left, int right, int k) { 472 // Sort 473 for (int i = right; --i >= k;) { 474 final double v = a[i]; 475 // Move succeeding lower elements below (if required) 476 if (v > a[i + 1]) { 477 int j = i; 478 while (++j <= right && v > a[j]) { 479 a[j - 1] = a[j]; 480 } 481 a[j - 1] = v; 482 } 483 } 484 // Scan the remaining data and insert 485 // Mitigate worst case performance on descending data by backward sweep 486 double m = a[k]; 487 for (int i = left - 1; ++i < k;) { 488 final double v = a[i]; 489 if (v > m) { 490 a[i] = m; 491 int j = k; 492 while (++j <= right && v > a[j]) { 493 a[j - 1] = a[j]; 494 } 495 a[j - 1] = v; 496 m = a[k]; 497 } 498 } 499 } 500 501 /** 502 * Partition the array such that index {@code k} corresponds to its correctly 503 * sorted value in the equivalent fully sorted array. 504 * 505 * <p>Assumes {@code k} is a valid index into [left, right]. 506 * 507 * @param a Values. 508 * @param left Lower bound of data (inclusive). 509 * @param right Upper bound of data (inclusive). 510 * @param k Index. 511 */ 512 static void select(double[] a, int left, int right, int k) { 513 quickSelectAdaptive(a, left, right, k, k, new int[1], MODE_FR_SAMPLING); 514 } 515 516 /** 517 * Partition the array such that indices {@code k} correspond to their correctly 518 * sorted value in the equivalent fully sorted array. 519 * 520 * <p>The count of the number of used indices is returned. If the keys are sorted in-place, 521 * the count is returned as a negative. 522 * 523 * @param a Values. 524 * @param left Lower bound of data (inclusive). 525 * @param right Upper bound of data (inclusive). 526 * @param k Indices (may be destructively modified). 527 * @param n Count of indices. 528 * @return the count of used indices 529 * @throws IndexOutOfBoundsException if any index {@code k} is not within the 530 * sub-range {@code [left, right]} 531 */ 532 static int select(double[] a, int left, int right, int[] k, int n) { 533 if (n < 1) { 534 return 0; 535 } 536 if (n == 1) { 537 quickSelectAdaptive(a, left, right, k[0], k[0], new int[1], MODE_FR_SAMPLING); 538 return -1; 539 } 540 541 // Interval creation validates the indices are in [left, right] 542 final UpdatingInterval keys = IndexSupport.createUpdatingInterval(left, right, k, n); 543 544 // Save number of used indices 545 final int count = IndexSupport.countIndices(keys, n); 546 547 // Note: If the keys are not separated then they are effectively a single key. 548 // Any split of keys separated by the sort select size 549 // will be finished on the next iteration. 550 final int k1 = keys.left(); 551 final int kn = keys.right(); 552 if (kn - k1 < DP_SORTSELECT_SIZE) { 553 quickSelectAdaptive(a, left, right, k1, kn, new int[1], MODE_FR_SAMPLING); 554 } else { 555 // Dual-pivot mode with small range sort length configured using index density 556 dualPivotQuickSelect(a, left, right, keys, dualPivotFlags(left, right, k1, kn)); 557 } 558 return count; 559 } 560 561 /** 562 * Partition the array such that indices {@code k} correspond to their 563 * correctly sorted value in the equivalent fully sorted array. 564 * 565 * <p>For all indices {@code [ka, kb]} and any index {@code i}: 566 * 567 * <pre>{@code 568 * data[i < ka] <= data[ka] <= data[kb] <= data[kb < i] 569 * }</pre> 570 * 571 * <p>This function accepts indices {@code [ka, kb]} that define the 572 * range of indices to partition. It is expected that the range is small. 573 * 574 * <p>The {@code flags} are used to control the sampling mode and adaption of 575 * the index within the sample. 576 * 577 * <p>Returns the bounds containing {@code [ka, kb]}. These may be lower/higher 578 * than the keys if equal values are present in the data. 579 * 580 * @param a Values. 581 * @param left Lower bound of data (inclusive, assumed to be strictly positive). 582 * @param right Upper bound of data (inclusive, assumed to be strictly positive). 583 * @param ka First key of interest. 584 * @param kb Last key of interest. 585 * @param bounds Upper bound of the range containing {@code [ka, kb]} (inclusive). 586 * @param flags Adaption flags. 587 * @return Lower bound of the range containing {@code [ka, kb]} (inclusive). 588 */ 589 static int quickSelectAdaptive(double[] a, int left, int right, int ka, int kb, 590 int[] bounds, int flags) { 591 int l = left; 592 int r = right; 593 int m = flags; 594 while (true) { 595 // Select when ka and kb are close to the same end 596 // |l|-----|ka|kkkkkkkk|kb|------|r| 597 if (Math.min(kb - l, r - ka) < LINEAR_SORTSELECT_SIZE) { 598 sortSelect(a, l, r, ka, kb); 599 bounds[0] = kb; 600 return ka; 601 } 602 603 // Only target ka; kb is assumed to be close 604 int p0; 605 final int n = r - l; 606 // f in [0, 1] 607 final double f = (double) (ka - l) / n; 608 // Record the larger margin (start at 1/4) to create the estimated size. 609 // step L R 610 // far left 1/12 1/3 (use 1/4 + 1/32 + 1/64 ~ 0.328) 611 // left 1/6 1/4 612 // middle 2/9 2/9 (use 1/4 - 1/32 ~ 0.219) 613 int margin = n >> 2; 614 if (m < MODE_SAMPLING && r - l > FR_SAMPLING_SIZE) { 615 // Floyd-Rivest sample step uses the same margins 616 p0 = sampleStep(a, l, r, ka, bounds); 617 if (f <= STEP_FAR_LEFT || f >= STEP_FAR_RIGHT) { 618 margin += (n >> 5) + (n >> 6); 619 } else if (f > STEP_LEFT && f < STEP_RIGHT) { 620 margin -= n >> 5; 621 } 622 } else if (f <= STEP_LEFT) { 623 if (f <= STEP_FAR_LEFT) { 624 margin += (n >> 5) + (n >> 6); 625 p0 = repeatedStepFarLeft(a, l, r, ka, bounds, m); 626 } else { 627 p0 = repeatedStepLeft(a, l, r, ka, bounds, m); 628 } 629 } else if (f >= STEP_RIGHT) { 630 if (f >= STEP_FAR_RIGHT) { 631 margin += (n >> 5) + (n >> 6); 632 p0 = repeatedStepFarRight(a, l, r, ka, bounds, m); 633 } else { 634 p0 = repeatedStepRight(a, l, r, ka, bounds, m); 635 } 636 } else { 637 margin -= n >> 5; 638 p0 = repeatedStep(a, l, r, ka, bounds, m); 639 } 640 641 // Note: Here we expect [ka, kb] to be small and splitting is unlikely. 642 // p0 p1 643 // |l|--|ka|kkkk|kb|--|P|-------------------|r| 644 // |l|----------------|P|--|ka|kkk|kb|------|r| 645 // |l|-----------|ka|k|P|k|kb|--------------|r| 646 final int p1 = bounds[0]; 647 if (kb < p0) { 648 // Entirely on left side 649 r = p0 - 1; 650 } else if (ka > p1) { 651 // Entirely on right side 652 l = p1 + 1; 653 } else { 654 // Pivot splits [ka, kb]. Expect ends to be close to the pivot and finish. 655 // Here we set the bounds for use after median-of-medians pivot selection. 656 // In the event there are many equal values this allows collecting those 657 // known to be equal together when moving around the medians sample. 658 if (kb > p1) { 659 sortSelectLeft(a, p1 + 1, r, kb); 660 bounds[0] = kb; 661 } 662 if (ka < p0) { 663 sortSelectRight(a, l, p0 - 1, ka); 664 p0 = ka; 665 } 666 return p0; 667 } 668 // Update mode based on target partition size 669 if (r - l > n - margin) { 670 m++; 671 } 672 } 673 } 674 675 /** 676 * Partition an array slice around a pivot. Partitioning exchanges array elements such 677 * that all elements smaller than pivot are before it and all elements larger than 678 * pivot are after it. 679 * 680 * <p>Partitions a Floyd-Rivest sample around a pivot offset so that the input {@code k} will 681 * fall in the smaller partition when the entire range is partitioned. 682 * 683 * <p>Assumes the range {@code r - l} is large. 684 * 685 * @param a Data array. 686 * @param l Lower bound (inclusive). 687 * @param r Upper bound (inclusive). 688 * @param k Target index. 689 * @param upper Upper bound (inclusive) of the pivot range. 690 * @return Lower bound (inclusive) of the pivot range. 691 */ 692 private static int sampleStep(double[] a, int l, int r, int k, int[] upper) { 693 // Floyd-Rivest: use SELECT recursively on a sample of size S to get an estimate 694 // for the (k-l+1)-th smallest element into a[k], biased slightly so that the 695 // (k-l+1)-th element is expected to lie in the smaller set after partitioning. 696 final int n = r - l + 1; 697 final int ith = k - l + 1; 698 final double z = Math.log(n); 699 // sample size = 0.5 * n^(2/3) 700 final double s = 0.5 * Math.exp(0.6666666666666666 * z); 701 final double sd = 0.5 * Math.sqrt(z * s * (n - s) / n) * Integer.signum(ith - (n >> 1)); 702 final int ll = Math.max(l, (int) (k - ith * s / n + sd)); 703 final int rr = Math.min(r, (int) (k + (n - ith) * s / n + sd)); 704 // Sample recursion restarts from [ll, rr] 705 final int p = quickSelectAdaptive(a, ll, rr, k, k, upper, MODE_FR_SAMPLING); 706 return expandPartition(a, l, r, ll, rr, p, upper[0], upper); 707 } 708 709 /** 710 * Partition an array slice around a pivot. Partitioning exchanges array elements such 711 * that all elements smaller than pivot are before it and all elements larger than 712 * pivot are after it. 713 * 714 * <p>Assumes the range {@code r - l >= 8}; the caller is responsible for selection on a smaller 715 * range. If using a 12th-tile for sampling then assumes {@code r - l >= 11}. 716 * 717 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm 718 * with the median of 3 then median of 3; the final sample is placed in the 719 * 5th 9th-tile; the pivot chosen from the sample is adaptive using the input {@code k}. 720 * 721 * <p>Given a pivot in the middle of the sample this has margins of 2/9 and 2/9. 722 * 723 * @param a Data array. 724 * @param l Lower bound (inclusive). 725 * @param r Upper bound (inclusive). 726 * @param k Target index. 727 * @param upper Upper bound (inclusive) of the pivot range. 728 * @param flags Control flags. 729 * @return Lower bound (inclusive) of the pivot range. 730 */ 731 private static int repeatedStep(double[] a, int l, int r, int k, int[] upper, int flags) { 732 // Adapted from Alexandrescu (2016), algorithm 8. 733 final int fp; 734 final int s; 735 int p; 736 if (flags <= MODE_SAMPLING) { 737 // Median into a 12th-tile 738 fp = (r - l + 1) / 12; 739 // Position the sample around the target k 740 s = k - mapDistance(k - l, l, r, fp); 741 p = k; 742 } else { 743 // i in tertile [3f':6f') 744 fp = (r - l + 1) / 9; 745 final int f3 = 3 * fp; 746 final int end = l + (f3 << 1); 747 for (int i = l + f3; i < end; i++) { 748 Sorting.sort3(a, i - f3, i, i + f3); 749 } 750 // 5th 9th-tile: [4f':5f') 751 s = l + (fp << 2); 752 // No adaption uses the middle to enforce strict margins 753 p = s + (flags == MODE_ADAPTION ? mapDistance(k - l, l, r, fp) : (fp >>> 1)); 754 } 755 final int e = s + fp - 1; 756 for (int i = s; i <= e; i++) { 757 Sorting.sort3(a, i - fp, i, i + fp); 758 } 759 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING); 760 return expandPartition(a, l, r, s, e, p, upper[0], upper); 761 } 762 763 /** 764 * Partition an array slice around a pivot. Partitioning exchanges array elements such 765 * that all elements smaller than pivot are before it and all elements larger than 766 * pivot are after it. 767 * 768 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller 769 * range. 770 * 771 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm 772 * with the lower median of 4 then either median of 3 with the final sample placed in the 773 * 5th 12th-tile, or min of 3 with the final sample in the 4th 12th-tile; 774 * the pivot chosen from the sample is adaptive using the input {@code k}. 775 * 776 * <p>Given a pivot in the middle of the sample this has margins of 1/6 and 1/4. 777 * 778 * @param a Data array. 779 * @param l Lower bound (inclusive). 780 * @param r Upper bound (inclusive). 781 * @param k Target index. 782 * @param upper Upper bound (inclusive) of the pivot range. 783 * @param flags Control flags. 784 * @return Lower bound (inclusive) of the pivot range. 785 */ 786 private static int repeatedStepLeft(double[] a, int l, int r, int k, int[] upper, int flags) { 787 // Adapted from Alexandrescu (2016), algorithm 9. 788 final int fp; 789 final int s; 790 int p; 791 if (flags <= MODE_SAMPLING) { 792 // Median into a 12th-tile 793 fp = (r - l + 1) / 12; 794 // Position the sample around the target k 795 // Avoid bounds error due to rounding as (k-l)/(r-l) -> 1/12 796 s = Math.max(k - mapDistance(k - l, l, r, fp), l + fp); 797 p = k; 798 } else { 799 // i in 2nd quartile 800 final int f = (r - l + 1) >> 2; 801 final int f2 = f + f; 802 final int end = l + f2; 803 for (int i = l + f; i < end; i++) { 804 Sorting.lowerMedian4(a, i - f, i, i + f, i + f2); 805 } 806 // i in 5th 12th-tile 807 fp = f / 3; 808 s = l + f + fp; 809 // No adaption uses the middle to enforce strict margins 810 p = s + (flags == MODE_ADAPTION ? mapDistance(k - l, l, r, fp) : (fp >>> 1)); 811 } 812 final int e = s + fp - 1; 813 for (int i = s; i <= e; i++) { 814 Sorting.sort3(a, i - fp, i, i + fp); 815 } 816 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING); 817 return expandPartition(a, l, r, s, e, p, upper[0], upper); 818 } 819 820 /** 821 * Partition an array slice around a pivot. Partitioning exchanges array elements such 822 * that all elements smaller than pivot are before it and all elements larger than 823 * pivot are after it. 824 * 825 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller 826 * range. 827 * 828 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm 829 * with the upper median of 4 then either median of 3 with the final sample placed in the 830 * 8th 12th-tile, or max of 3 with the final sample in the 9th 12th-tile; 831 * the pivot chosen from the sample is adaptive using the input {@code k}. 832 * 833 * <p>Given a pivot in the middle of the sample this has margins of 1/4 and 1/6. 834 * 835 * @param a Data array. 836 * @param l Lower bound (inclusive). 837 * @param r Upper bound (inclusive). 838 * @param k Target index. 839 * @param upper Upper bound (inclusive) of the pivot range. 840 * @param flags Control flags. 841 * @return Lower bound (inclusive) of the pivot range. 842 */ 843 private static int repeatedStepRight(double[] a, int l, int r, int k, int[] upper, int flags) { 844 // Mirror image repeatedStepLeft using upper median into 3rd quartile 845 final int fp; 846 final int e; 847 int p; 848 if (flags <= MODE_SAMPLING) { 849 // Median into a 12th-tile 850 fp = (r - l + 1) / 12; 851 // Position the sample around the target k 852 // Avoid bounds error due to rounding as (r-k)/(r-l) -> 11/12 853 e = Math.min(k + mapDistance(r - k, l, r, fp), r - fp); 854 p = k; 855 } else { 856 // i in 3rd quartile 857 final int f = (r - l + 1) >> 2; 858 final int f2 = f + f; 859 final int end = r - f2; 860 for (int i = r - f; i > end; i--) { 861 Sorting.upperMedian4(a, i - f2, i - f, i, i + f); 862 } 863 // i in 8th 12th-tile 864 fp = f / 3; 865 e = r - f - fp; 866 // No adaption uses the middle to enforce strict margins 867 p = e - (flags == MODE_ADAPTION ? mapDistance(r - k, l, r, fp) : (fp >>> 1)); 868 } 869 final int s = e - fp + 1; 870 for (int i = s; i <= e; i++) { 871 Sorting.sort3(a, i - fp, i, i + fp); 872 } 873 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING); 874 return expandPartition(a, l, r, s, e, p, upper[0], upper); 875 } 876 877 /** 878 * Partition an array slice around a pivot. Partitioning exchanges array elements such 879 * that all elements smaller than pivot are before it and all elements larger than 880 * pivot are after it. 881 * 882 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller 883 * range. 884 * 885 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm 886 * with the minimum of 4 then median of 3; the final sample is placed in the 887 * 2nd 12th-tile; the pivot chosen from the sample is adaptive using the input {@code k}. 888 * 889 * <p>Given a pivot in the middle of the sample this has margins of 1/12 and 1/3. 890 * 891 * @param a Data array. 892 * @param l Lower bound (inclusive). 893 * @param r Upper bound (inclusive). 894 * @param k Target index. 895 * @param upper Upper bound (inclusive) of the pivot range. 896 * @param flags Control flags. 897 * @return Lower bound (inclusive) of the pivot range. 898 */ 899 private static int repeatedStepFarLeft(double[] a, int l, int r, int k, int[] upper, int flags) { 900 // Far step has been changed from the Alexandrescu (2016) step of lower-median-of-4, min-of-3 901 // into the 4th 12th-tile to a min-of-4, median-of-3 into the 2nd 12th-tile. 902 // The differences are: 903 // - The upper margin when not sampling is 8/24 vs. 9/24; the lower margin remains at 1/12. 904 // - The position of the sample is closer to the expected location of k < |A| / 12. 905 // - Sampling mode uses a median-of-3 with adaptive k, matching the other step methods. 906 // A min-of-3 sample can create a pivot too small if used with adaption of k leaving 907 // k in the larger parition and a wasted iteration. 908 // - Adaption is adjusted to force use of the lower margin when not sampling. 909 final int fp; 910 final int s; 911 int p; 912 if (flags <= MODE_SAMPLING) { 913 // 2nd 12th-tile 914 fp = (r - l + 1) / 12; 915 s = l + fp; 916 // Use adaption 917 p = s + mapDistance(k - l, l, r, fp); 918 } else { 919 // i in 2nd quartile; min into i-f (1st quartile) 920 final int f = (r - l + 1) >> 2; 921 final int f2 = f + f; 922 final int end = l + f2; 923 for (int i = l + f; i < end; i++) { 924 if (a[i + f] < a[i - f]) { 925 final double u = a[i + f]; 926 a[i + f] = a[i - f]; 927 a[i - f] = u; 928 } 929 if (a[i + f2] < a[i]) { 930 final double v = a[i + f2]; 931 a[i + f2] = a[i]; 932 a[i] = v; 933 } 934 if (a[i] < a[i - f]) { 935 final double u = a[i]; 936 a[i] = a[i - f]; 937 a[i - f] = u; 938 } 939 } 940 // 2nd 12th-tile 941 fp = f / 3; 942 s = l + fp; 943 // Lower margin has 2(d+1) elements; d == (position in sample) - s 944 // Force k into the lower margin 945 p = s + ((k - l) >>> 1); 946 } 947 final int e = s + fp - 1; 948 for (int i = s; i <= e; i++) { 949 Sorting.sort3(a, i - fp, i, i + fp); 950 } 951 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING); 952 return expandPartition(a, l, r, s, e, p, upper[0], upper); 953 } 954 955 /** 956 * Partition an array slice around a pivot. Partitioning exchanges array elements such 957 * that all elements smaller than pivot are before it and all elements larger than 958 * pivot are after it. 959 * 960 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller 961 * range. 962 * 963 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm 964 * with the maximum of 4 then median of 3; the final sample is placed in the 965 * 11th 12th-tile; the pivot chosen from the sample is adaptive using the input {@code k}. 966 * 967 * <p>Given a pivot in the middle of the sample this has margins of 1/3 and 1/12. 968 * 969 * @param a Data array. 970 * @param l Lower bound (inclusive). 971 * @param r Upper bound (inclusive). 972 * @param k Target index. 973 * @param upper Upper bound (inclusive) of the pivot range. 974 * @param flags Control flags. 975 * @return Lower bound (inclusive) of the pivot range. 976 */ 977 private static int repeatedStepFarRight(double[] a, int l, int r, int k, int[] upper, int flags) { 978 // Mirror image repeatedStepFarLeft 979 final int fp; 980 final int e; 981 int p; 982 if (flags <= MODE_SAMPLING) { 983 // 11th 12th-tile 984 fp = (r - l + 1) / 12; 985 e = r - fp; 986 // Use adaption 987 p = e - mapDistance(r - k, l, r, fp); 988 } else { 989 // i in 3rd quartile; max into i+f (4th quartile) 990 final int f = (r - l + 1) >> 2; 991 final int f2 = f + f; 992 final int end = r - f2; 993 for (int i = r - f; i > end; i--) { 994 if (a[i - f] > a[i + f]) { 995 final double u = a[i - f]; 996 a[i - f] = a[i + f]; 997 a[i + f] = u; 998 } 999 if (a[i - f2] > a[i]) { 1000 final double v = a[i - f2]; 1001 a[i - f2] = a[i]; 1002 a[i] = v; 1003 } 1004 if (a[i] > a[i + f]) { 1005 final double u = a[i]; 1006 a[i] = a[i + f]; 1007 a[i + f] = u; 1008 } 1009 } 1010 // 11th 12th-tile 1011 fp = f / 3; 1012 e = r - fp; 1013 // Upper margin has 2(d+1) elements; d == e - (position in sample) 1014 // Force k into the upper margin 1015 p = e - ((r - k) >>> 1); 1016 } 1017 final int s = e - fp + 1; 1018 for (int i = s; i <= e; i++) { 1019 Sorting.sort3(a, i - fp, i, i + fp); 1020 } 1021 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING); 1022 return expandPartition(a, l, r, s, e, p, upper[0], upper); 1023 } 1024 1025 /** 1026 * Expand a partition around a single pivot. Partitioning exchanges array 1027 * elements such that all elements smaller than pivot are before it and all 1028 * elements larger than pivot are after it. The central region is already 1029 * partitioned. 1030 * 1031 * <pre>{@code 1032 * |l |s |p0 p1| e| r| 1033 * | ??? | <P | ==P | >P | ??? | 1034 * }</pre> 1035 * 1036 * <p>This requires that {@code start != end}. However it handles 1037 * {@code left == start} and/or {@code end == right}. 1038 * 1039 * @param a Data array. 1040 * @param left Lower bound (inclusive). 1041 * @param right Upper bound (inclusive). 1042 * @param start Start of the partition range (inclusive). 1043 * @param end End of the partitioned range (inclusive). 1044 * @param pivot0 Lower pivot location (inclusive). 1045 * @param pivot1 Upper pivot location (inclusive). 1046 * @param upper Upper bound (inclusive) of the pivot range [k1]. 1047 * @return Lower bound (inclusive) of the pivot range [k0]. 1048 */ 1049 // package-private for testing 1050 static int expandPartition(double[] a, int left, int right, int start, int end, 1051 int pivot0, int pivot1, int[] upper) { 1052 // 3-way partition of the data using a pivot value into 1053 // less-than, equal or greater-than. 1054 // Based on Sedgewick's Bentley-McIroy partitioning: always swap i<->j then 1055 // check for equal to the pivot and move again. 1056 // 1057 // Move sentinels from start and end to left and right. Scan towards the 1058 // sentinels until >=,<=. Swap then move == to the pivot region. 1059 // <-i j-> 1060 // |l | | |p0 p1| | | r| 1061 // |>=| ??? | < | == | > | ??? |<=| 1062 // 1063 // When either i or j reach the edge perform finishing loop. 1064 // Finish loop for a[j] <= v replaces j with p1+1, optionally moves value 1065 // to p0 for < and updates the pivot range p1 (and optionally p0): 1066 // j-> 1067 // |l |p0 p1| | | r| 1068 // | < | == | > | ??? |<=| 1069 1070 final double v = a[pivot0]; 1071 // Use start/end as sentinels (requires start != end) 1072 double vi = a[start]; 1073 double vj = a[end]; 1074 a[start] = a[left]; 1075 a[end] = a[right]; 1076 a[left] = vj; 1077 a[right] = vi; 1078 1079 int i = start + 1; 1080 int j = end - 1; 1081 1082 // Positioned for pre-in/decrement to write to pivot region 1083 int p0 = pivot0 == start ? i : pivot0; 1084 int p1 = pivot1 == end ? j : pivot1; 1085 1086 while (true) { 1087 do { 1088 --i; 1089 } while (a[i] < v); 1090 do { 1091 ++j; 1092 } while (a[j] > v); 1093 vj = a[i]; 1094 vi = a[j]; 1095 a[i] = vi; 1096 a[j] = vj; 1097 // Move the equal values to pivot region 1098 if (vi == v) { 1099 a[i] = a[--p0]; 1100 a[p0] = v; 1101 } 1102 if (vj == v) { 1103 a[j] = a[++p1]; 1104 a[p1] = v; 1105 } 1106 // Termination check and finishing loops. 1107 // Note: This works even if pivot region is zero length (p1 == p0-1 due to 1108 // length 1 pivot region at either start/end) because we pre-inc/decrement 1109 // one side and post-inc/decrement the other side. 1110 if (i == left) { 1111 while (j < right) { 1112 do { 1113 ++j; 1114 } while (a[j] > v); 1115 final double w = a[j]; 1116 // Move upper bound of pivot region 1117 a[j] = a[++p1]; 1118 a[p1] = v; 1119 // Move lower bound of pivot region 1120 if (w != v) { 1121 a[p0] = w; 1122 p0++; 1123 } 1124 } 1125 break; 1126 } 1127 if (j == right) { 1128 while (i > left) { 1129 do { 1130 --i; 1131 } while (a[i] < v); 1132 final double w = a[i]; 1133 // Move lower bound of pivot region 1134 a[i] = a[--p0]; 1135 a[p0] = v; 1136 // Move upper bound of pivot region 1137 if (w != v) { 1138 a[p1] = w; 1139 p1--; 1140 } 1141 } 1142 break; 1143 } 1144 } 1145 1146 upper[0] = p1; 1147 return p0; 1148 } 1149 1150 /** 1151 * Partition the array such that indices {@code k} correspond to their 1152 * correctly sorted value in the equivalent fully sorted array. 1153 * 1154 * <p>For all indices {@code k} and any index {@code i}: 1155 * 1156 * <pre>{@code 1157 * data[i < k] <= data[k] <= data[k < i] 1158 * }</pre> 1159 * 1160 * <p>This function accepts a {@link UpdatingInterval} of indices {@code k} that define the 1161 * range of indices to partition. The {@link UpdatingInterval} can be narrowed or split as 1162 * partitioning divides the range. 1163 * 1164 * <p>Uses an introselect variant. The quickselect is a dual-pivot quicksort 1165 * partition method by Vladimir Yaroslavskiy; the fall-back on poor convergence of 1166 * the quickselect is a heapselect. 1167 * 1168 * <p>The {@code flags} contain the the current recursion count and the configured 1169 * length threshold for {@code r - l} to perform sort select. The count is in the upper 1170 * bits and the threshold is in the lower bits. 1171 * 1172 * @param a Values. 1173 * @param left Lower bound of data (inclusive, assumed to be strictly positive). 1174 * @param right Upper bound of data (inclusive, assumed to be strictly positive). 1175 * @param k Interval of indices to partition (ordered). 1176 * @param flags Control flags. 1177 */ 1178 // package-private for testing 1179 static void dualPivotQuickSelect(double[] a, int left, int right, UpdatingInterval k, int flags) { 1180 // If partitioning splits the interval then recursion is used for the left-most side(s) 1181 // and the right-most side remains within this function. If partitioning does 1182 // not split the interval then it remains within this function. 1183 int l = left; 1184 int r = right; 1185 int f = flags; 1186 int ka = k.left(); 1187 int kb = k.right(); 1188 final int[] upper = {0, 0, 0}; 1189 while (true) { 1190 // Select when ka and kb are close to the same end, 1191 // or the entire range is small 1192 // |l|-----|ka|--------|kb|------|r| 1193 final int n = r - l; 1194 if (Math.min(kb - l, r - ka) < DP_SORTSELECT_SIZE || 1195 n < (f & SORTSELECT_MASK)) { 1196 sortSelect(a, l, r, ka, kb); 1197 return; 1198 } 1199 if (kb - ka < DP_SORTSELECT_SIZE) { 1200 // Switch to single-pivot mode with Floyd-Rivest sub-sampling 1201 quickSelectAdaptive(a, l, r, ka, kb, upper, MODE_FR_SAMPLING); 1202 return; 1203 } 1204 if (f < 0) { 1205 // Excess recursion, switch to heap select 1206 heapSelect(a, l, r, ka, kb); 1207 return; 1208 } 1209 1210 // Dual-pivot partitioning 1211 final int p0 = partition(a, l, r, upper); 1212 final int p1 = upper[0]; 1213 1214 // Recursion to max depth 1215 // Note: Here we possibly branch left, middle and right with multiple keys. 1216 // It is possible that the partition has split the keys 1217 // and the recursion proceeds with a reduced set in each region. 1218 // p0 p1 p2 p3 1219 // |l|--|ka|--k----k--|P|------k--|kb|----|P|----|r| 1220 // kb | ka 1221 f += RECURSION_INCREMENT; 1222 // Recurse left side if required 1223 if (ka < p0) { 1224 if (kb <= p1) { 1225 // Entirely on left side 1226 r = p0 - 1; 1227 if (r < kb) { 1228 kb = k.updateRight(r); 1229 } 1230 continue; 1231 } 1232 dualPivotQuickSelect(a, l, p0 - 1, k.splitLeft(p0, p1), f); 1233 // Here we must process middle and/or right 1234 ka = k.left(); 1235 } else if (kb <= p1) { 1236 // No middle/right side 1237 return; 1238 } else if (ka <= p1) { 1239 // Advance lower bound 1240 ka = k.updateLeft(p1 + 1); 1241 } 1242 // Recurse middle if required 1243 final int p2 = upper[1]; 1244 final int p3 = upper[2]; 1245 if (ka < p2) { 1246 l = p1 + 1; 1247 if (kb <= p3) { 1248 // Entirely in middle 1249 r = p2 - 1; 1250 if (r < kb) { 1251 kb = k.updateRight(r); 1252 } 1253 continue; 1254 } 1255 dualPivotQuickSelect(a, l, p2 - 1, k.splitLeft(p2, p3), f); 1256 ka = k.left(); 1257 } else if (kb <= p3) { 1258 // No right side 1259 return; 1260 } else if (ka <= p3) { 1261 ka = k.updateLeft(p3 + 1); 1262 } 1263 // Continue right 1264 l = p3 + 1; 1265 } 1266 } 1267 1268 /** 1269 * Partition an array slice around 2 pivots. Partitioning exchanges array elements 1270 * such that all elements smaller than pivot are before it and all elements larger 1271 * than pivot are after it. 1272 * 1273 * <p>This method returns 4 points describing the pivot ranges of equal values. 1274 * 1275 * <pre>{@code 1276 * |k0 k1| |k2 k3| 1277 * | <P | ==P1 | <P1 && <P2 | ==P2 | >P | 1278 * }</pre> 1279 * 1280 * <ul> 1281 * <li>k0: lower pivot1 point 1282 * <li>k1: upper pivot1 point (inclusive) 1283 * <li>k2: lower pivot2 point 1284 * <li>k3: upper pivot2 point (inclusive) 1285 * </ul> 1286 * 1287 * <p>Bounds are set so {@code i < k0}, {@code i > k3} and {@code k1 < i < k2} are 1288 * unsorted. When the range {@code [k0, k3]} contains fully sorted elements the result 1289 * is set to {@code k1 = k3; k2 == k0}. This can occur if 1290 * {@code P1 == P2} or there are zero or one value between the pivots 1291 * {@code P1 < v < P2}. Any sort/partition of ranges [left, k0-1], [k1+1, k2-1] and 1292 * [k3+1, right] must check the length is {@code > 1}. 1293 * 1294 * @param a Data array. 1295 * @param left Lower bound (inclusive). 1296 * @param right Upper bound (inclusive). 1297 * @param bounds Points [k1, k2, k3]. 1298 * @return Lower bound (inclusive) of the pivot range [k0]. 1299 */ 1300 private static int partition(double[] a, int left, int right, int[] bounds) { 1301 // Pick 2 pivots from 5 approximately uniform through the range. 1302 // Spacing is ~ 1/7 made using shifts. Other strategies are equal or much 1303 // worse. 1/7 = 5/35 ~ 1/8 + 1/64 : 0.1429 ~ 0.1406 1304 // Ensure the value is above zero to choose different points! 1305 final int n = right - left; 1306 final int step = 1 + (n >>> 3) + (n >>> 6); 1307 final int i3 = left + (n >>> 1); 1308 final int i2 = i3 - step; 1309 final int i1 = i2 - step; 1310 final int i4 = i3 + step; 1311 final int i5 = i4 + step; 1312 Sorting.sort5(a, i1, i2, i3, i4, i5); 1313 1314 // Partition data using pivots P1 and P2 into less-than, greater-than or between. 1315 // Pivot values P1 & P2 are placed at the end. If P1 < P2, P2 acts as a sentinel. 1316 // k traverses the unknown region ??? and values moved if less-than or 1317 // greater-than: 1318 // 1319 // left less k great right 1320 // |P1| <P1 | P1 <= & <= P2 | ??? | >P2 |P2| 1321 // 1322 // <P1 (left, lt) 1323 // P1 <= & <= P2 [lt, k) 1324 // >P2 (gt, right) 1325 // 1326 // At the end pivots are swapped back to behind the less and great pointers. 1327 // 1328 // | <P1 |P1| P1<= & <= P2 |P2| >P2 | 1329 1330 // Swap ends to the pivot locations. 1331 final double v1 = a[i2]; 1332 a[i2] = a[left]; 1333 a[left] = v1; 1334 final double v2 = a[i4]; 1335 a[i4] = a[right]; 1336 a[right] = v2; 1337 1338 // pointers 1339 int less = left; 1340 int great = right; 1341 1342 // Fast-forward ascending / descending runs to reduce swaps. 1343 // Cannot overrun as end pivots (v1 <= v2) act as sentinels. 1344 do { 1345 ++less; 1346 } while (a[less] < v1); 1347 do { 1348 --great; 1349 } while (a[great] > v2); 1350 1351 // a[less - 1] < P1 : a[great + 1] > P2 1352 // unvisited in [less, great] 1353 SORTING: 1354 for (int k = less - 1; ++k <= great;) { 1355 final double v = a[k]; 1356 if (v < v1) { 1357 // swap(a, k, less++) 1358 a[k] = a[less]; 1359 a[less] = v; 1360 less++; 1361 } else if (v > v2) { 1362 // while k < great and a[great] > v2: 1363 // great-- 1364 while (a[great] > v2) { 1365 if (great-- == k) { 1366 // Done 1367 break SORTING; 1368 } 1369 } 1370 // swap(a, k, great--) 1371 // if a[k] < v1: 1372 // swap(a, k, less++) 1373 final double w = a[great]; 1374 a[great] = v; 1375 great--; 1376 // delay a[k] = w 1377 if (w < v1) { 1378 a[k] = a[less]; 1379 a[less] = w; 1380 less++; 1381 } else { 1382 a[k] = w; 1383 } 1384 } 1385 } 1386 1387 // Change to inclusive ends : a[less] < P1 : a[great] > P2 1388 less--; 1389 great++; 1390 // Move the pivots to correct locations 1391 a[left] = a[less]; 1392 a[less] = v1; 1393 a[right] = a[great]; 1394 a[great] = v2; 1395 1396 // Record the pivot locations 1397 final int lower = less; 1398 bounds[2] = great; 1399 1400 // equal elements 1401 // Original paper: If middle partition is bigger than a threshold 1402 // then check for equal elements. 1403 1404 // Note: This is extra work. When performing partitioning the region of interest 1405 // may be entirely above or below the central region and this can be skipped. 1406 1407 // Here we look for equal elements if the centre is more than 5/8 the length. 1408 // 5/8 = 1/2 + 1/8. Pivots must be different. 1409 if ((great - less) > (n >>> 1) + (n >>> 3) && v1 != v2) { 1410 1411 // Fast-forward to reduce swaps. Changes inclusive ends to exclusive ends. 1412 // Since v1 != v2 these act as sentinels to prevent overrun. 1413 do { 1414 ++less; 1415 } while (a[less] == v1); 1416 do { 1417 --great; 1418 } while (a[great] == v2); 1419 1420 // This copies the logic in the sorting loop using == comparisons 1421 EQUAL: 1422 for (int k = less - 1; ++k <= great;) { 1423 final double v = a[k]; 1424 if (v == v1) { 1425 a[k] = a[less]; 1426 a[less] = v; 1427 less++; 1428 } else if (v == v2) { 1429 while (a[great] == v2) { 1430 if (great-- == k) { 1431 // Done 1432 break EQUAL; 1433 } 1434 } 1435 final double w = a[great]; 1436 a[great] = v; 1437 great--; 1438 if (w == v1) { 1439 a[k] = a[less]; 1440 a[less] = w; 1441 less++; 1442 } else { 1443 a[k] = w; 1444 } 1445 } 1446 } 1447 1448 // Change to inclusive ends 1449 less--; 1450 great++; 1451 } 1452 1453 // Between pivots in (less, great) 1454 if (v1 != v2 && less < great - 1) { 1455 // Record the pivot end points 1456 bounds[0] = less; 1457 bounds[1] = great; 1458 } else { 1459 // No unsorted internal region (set k1 = k3; k2 = k0) 1460 bounds[0] = bounds[2]; 1461 bounds[1] = lower; 1462 } 1463 1464 return lower; 1465 } 1466 1467 /** 1468 * Partition the elements between {@code ka} and {@code kb} using a heap select 1469 * algorithm. It is assumed {@code left <= ka <= kb <= right}. 1470 * 1471 * @param a Data array to use to find out the K<sup>th</sup> value. 1472 * @param left Lower bound (inclusive). 1473 * @param right Upper bound (inclusive). 1474 * @param ka Lower index to select. 1475 * @param kb Upper index to select. 1476 */ 1477 static void heapSelect(int[] a, int left, int right, int ka, int kb) { 1478 if (right <= left) { 1479 return; 1480 } 1481 // Use the smallest heap 1482 if (kb - left < right - ka) { 1483 heapSelectLeft(a, left, right, ka, kb); 1484 } else { 1485 heapSelectRight(a, left, right, ka, kb); 1486 } 1487 } 1488 1489 /** 1490 * Partition the elements between {@code ka} and {@code kb} using a heap select 1491 * algorithm. It is assumed {@code left <= ka <= kb <= right}. 1492 * 1493 * <p>For best performance this should be called with {@code k} in the lower 1494 * half of the range. 1495 * 1496 * @param a Data array to use to find out the K<sup>th</sup> value. 1497 * @param left Lower bound (inclusive). 1498 * @param right Upper bound (inclusive). 1499 * @param ka Lower index to select. 1500 * @param kb Upper index to select. 1501 */ 1502 static void heapSelectLeft(int[] a, int left, int right, int ka, int kb) { 1503 // Create a max heap in-place in [left, k], rooted at a[left] = max 1504 // |l|-max-heap-|k|--------------| 1505 // Build the heap using Floyd's heap-construction algorithm for heap size n. 1506 // Start at parent of the last element in the heap (k), 1507 // i.e. start = parent(n-1) : parent(c) = floor((c - 1) / 2) : c = k - left 1508 int end = kb + 1; 1509 for (int p = left + ((kb - left - 1) >> 1); p >= left; p--) { 1510 maxHeapSiftDown(a, a[p], p, left, end); 1511 } 1512 // Scan the remaining data and insert 1513 // Mitigate worst case performance on descending data by backward sweep 1514 int max = a[left]; 1515 for (int i = right + 1; --i > kb;) { 1516 final int v = a[i]; 1517 if (v < max) { 1518 a[i] = max; 1519 maxHeapSiftDown(a, v, left, left, end); 1520 max = a[left]; 1521 } 1522 } 1523 // Partition [ka, kb] 1524 // |l|-max-heap-|k|--------------| 1525 // | <-swap-> | then sift down reduced size heap 1526 // Avoid sifting heap of size 1 1527 final int last = Math.max(left, ka - 1); 1528 while (--end > last) { 1529 maxHeapSiftDown(a, a[end], left, left, end); 1530 a[end] = max; 1531 max = a[left]; 1532 } 1533 } 1534 1535 /** 1536 * Sift the element down the max heap. 1537 * 1538 * <p>Assumes {@code root <= p < end}, i.e. the max heap is above root. 1539 * 1540 * @param a Heap data. 1541 * @param v Value to sift. 1542 * @param p Start position. 1543 * @param root Root of the heap. 1544 * @param end End of the heap (exclusive). 1545 */ 1546 private static void maxHeapSiftDown(int[] a, int v, int p, int root, int end) { 1547 // child2 = root + 2 * (parent - root) + 2 1548 // = 2 * parent - root + 2 1549 while (true) { 1550 // Right child 1551 int c = (p << 1) - root + 2; 1552 if (c > end) { 1553 // No left child 1554 break; 1555 } 1556 // Use the left child if right doesn't exist, or it is greater 1557 if (c == end || a[c] < a[c - 1]) { 1558 --c; 1559 } 1560 if (v >= a[c]) { 1561 // Parent greater than largest child - done 1562 break; 1563 } 1564 // Swap and descend 1565 a[p] = a[c]; 1566 p = c; 1567 } 1568 a[p] = v; 1569 } 1570 1571 /** 1572 * Partition the elements between {@code ka} and {@code kb} using a heap select 1573 * algorithm. It is assumed {@code left <= ka <= kb <= right}. 1574 * 1575 * <p>For best performance this should be called with {@code k} in the upper 1576 * half of the range. 1577 * 1578 * @param a Data array to use to find out the K<sup>th</sup> value. 1579 * @param left Lower bound (inclusive). 1580 * @param right Upper bound (inclusive). 1581 * @param ka Lower index to select. 1582 * @param kb Upper index to select. 1583 */ 1584 static void heapSelectRight(int[] a, int left, int right, int ka, int kb) { 1585 // Create a min heap in-place in [k, right], rooted at a[right] = min 1586 // |--------------|k|-min-heap-|r| 1587 // Build the heap using Floyd's heap-construction algorithm for heap size n. 1588 // Start at parent of the last element in the heap (k), 1589 // i.e. start = parent(n-1) : parent(c) = floor((c - 1) / 2) : c = right - k 1590 int end = ka - 1; 1591 for (int p = right - ((right - ka - 1) >> 1); p <= right; p++) { 1592 minHeapSiftDown(a, a[p], p, right, end); 1593 } 1594 // Scan the remaining data and insert 1595 // Mitigate worst case performance on descending data by backward sweep 1596 int min = a[right]; 1597 for (int i = left - 1; ++i < ka;) { 1598 final int v = a[i]; 1599 if (v > min) { 1600 a[i] = min; 1601 minHeapSiftDown(a, v, right, right, end); 1602 min = a[right]; 1603 } 1604 } 1605 // Partition [ka, kb] 1606 // |--------------|k|-min-heap-|r| 1607 // | <-swap-> | then sift down reduced size heap 1608 // Avoid sifting heap of size 1 1609 final int last = Math.min(right, kb + 1); 1610 while (++end < last) { 1611 minHeapSiftDown(a, a[end], right, right, end); 1612 a[end] = min; 1613 min = a[right]; 1614 } 1615 } 1616 1617 /** 1618 * Sift the element down the min heap. 1619 * 1620 * <p>Assumes {@code root >= p > end}, i.e. the max heap is below root. 1621 * 1622 * @param a Heap data. 1623 * @param v Value to sift. 1624 * @param p Start position. 1625 * @param root Root of the heap. 1626 * @param end End of the heap (exclusive). 1627 */ 1628 private static void minHeapSiftDown(int[] a, int v, int p, int root, int end) { 1629 // child2 = root - 2 * (root - parent) - 2 1630 // = 2 * parent - root - 2 1631 while (true) { 1632 // Right child 1633 int c = (p << 1) - root - 2; 1634 if (c < end) { 1635 // No left child 1636 break; 1637 } 1638 // Use the left child if right doesn't exist, or it is less 1639 if (c == end || a[c] > a[c + 1]) { 1640 ++c; 1641 } 1642 if (v <= a[c]) { 1643 // Parent less than smallest child - done 1644 break; 1645 } 1646 // Swap and descend 1647 a[p] = a[c]; 1648 p = c; 1649 } 1650 a[p] = v; 1651 } 1652 1653 /** 1654 * Partition the elements between {@code ka} and {@code kb} using a sort select 1655 * algorithm. It is assumed {@code left <= ka <= kb <= right}. 1656 * 1657 * @param a Data array to use to find out the K<sup>th</sup> value. 1658 * @param left Lower bound (inclusive). 1659 * @param right Upper bound (inclusive). 1660 * @param ka Lower index to select. 1661 * @param kb Upper index to select. 1662 */ 1663 static void sortSelect(int[] a, int left, int right, int ka, int kb) { 1664 // Combine the test for right <= left with 1665 // avoiding the overhead of sort select on tiny data. 1666 if (right - left <= MIN_SORTSELECT_SIZE) { 1667 Sorting.sort(a, left, right); 1668 return; 1669 } 1670 // Sort the smallest side 1671 if (kb - left < right - ka) { 1672 sortSelectLeft(a, left, right, kb); 1673 } else { 1674 sortSelectRight(a, left, right, ka); 1675 } 1676 } 1677 1678 /** 1679 * Partition the minimum {@code n} elements below {@code k} where 1680 * {@code n = k - left + 1}. Uses an insertion sort algorithm. 1681 * 1682 * <p>Works with any {@code k} in the range {@code left <= k <= right} 1683 * and performs a full sort of the range below {@code k}. 1684 * 1685 * <p>For best performance this should be called with 1686 * {@code k - left < right - k}, i.e. 1687 * to partition a value in the lower half of the range. 1688 * 1689 * @param a Data array to use to find out the K<sup>th</sup> value. 1690 * @param left Lower bound (inclusive). 1691 * @param right Upper bound (inclusive). 1692 * @param k Index to select. 1693 */ 1694 static void sortSelectLeft(int[] a, int left, int right, int k) { 1695 // Sort 1696 for (int i = left; ++i <= k;) { 1697 final int v = a[i]; 1698 // Move preceding higher elements above (if required) 1699 if (v < a[i - 1]) { 1700 int j = i; 1701 while (--j >= left && v < a[j]) { 1702 a[j + 1] = a[j]; 1703 } 1704 a[j + 1] = v; 1705 } 1706 } 1707 // Scan the remaining data and insert 1708 // Mitigate worst case performance on descending data by backward sweep 1709 int m = a[k]; 1710 for (int i = right + 1; --i > k;) { 1711 final int v = a[i]; 1712 if (v < m) { 1713 a[i] = m; 1714 int j = k; 1715 while (--j >= left && v < a[j]) { 1716 a[j + 1] = a[j]; 1717 } 1718 a[j + 1] = v; 1719 m = a[k]; 1720 } 1721 } 1722 } 1723 1724 /** 1725 * Partition the maximum {@code n} elements above {@code k} where 1726 * {@code n = right - k + 1}. Uses an insertion sort algorithm. 1727 * 1728 * <p>Works with any {@code k} in the range {@code left <= k <= right} 1729 * and can be used to perform a full sort of the range above {@code k}. 1730 * 1731 * <p>For best performance this should be called with 1732 * {@code k - left > right - k}, i.e. 1733 * to partition a value in the upper half of the range. 1734 * 1735 * @param a Data array to use to find out the K<sup>th</sup> value. 1736 * @param left Lower bound (inclusive). 1737 * @param right Upper bound (inclusive). 1738 * @param k Index to select. 1739 */ 1740 static void sortSelectRight(int[] a, int left, int right, int k) { 1741 // Sort 1742 for (int i = right; --i >= k;) { 1743 final int v = a[i]; 1744 // Move succeeding lower elements below (if required) 1745 if (v > a[i + 1]) { 1746 int j = i; 1747 while (++j <= right && v > a[j]) { 1748 a[j - 1] = a[j]; 1749 } 1750 a[j - 1] = v; 1751 } 1752 } 1753 // Scan the remaining data and insert 1754 // Mitigate worst case performance on descending data by backward sweep 1755 int m = a[k]; 1756 for (int i = left - 1; ++i < k;) { 1757 final int v = a[i]; 1758 if (v > m) { 1759 a[i] = m; 1760 int j = k; 1761 while (++j <= right && v > a[j]) { 1762 a[j - 1] = a[j]; 1763 } 1764 a[j - 1] = v; 1765 m = a[k]; 1766 } 1767 } 1768 } 1769 1770 /** 1771 * Partition the array such that index {@code k} corresponds to its correctly 1772 * sorted value in the equivalent fully sorted array. 1773 * 1774 * <p>Assumes {@code k} is a valid index into [left, right]. 1775 * 1776 * @param a Values. 1777 * @param left Lower bound of data (inclusive). 1778 * @param right Upper bound of data (inclusive). 1779 * @param k Index. 1780 */ 1781 static void select(int[] a, int left, int right, int k) { 1782 quickSelectAdaptive(a, left, right, k, k, new int[1], MODE_FR_SAMPLING); 1783 } 1784 1785 /** 1786 * Partition the array such that indices {@code k} correspond to their correctly 1787 * sorted value in the equivalent fully sorted array. 1788 * 1789 * <p>The count of the number of used indices is returned. If the keys are sorted in-place, 1790 * the count is returned as a negative. 1791 * 1792 * @param a Values. 1793 * @param left Lower bound of data (inclusive). 1794 * @param right Upper bound of data (inclusive). 1795 * @param k Indices (may be destructively modified). 1796 * @param n Count of indices. 1797 * @throws IndexOutOfBoundsException if any index {@code k} is not within the 1798 * sub-range {@code [left, right]} 1799 */ 1800 static void select(int[] a, int left, int right, int[] k, int n) { 1801 if (n == 1) { 1802 quickSelectAdaptive(a, left, right, k[0], k[0], new int[1], MODE_FR_SAMPLING); 1803 return; 1804 } 1805 1806 // Interval creation validates the indices are in [left, right] 1807 final UpdatingInterval keys = IndexSupport.createUpdatingInterval(left, right, k, n); 1808 1809 // Note: If the keys are not separated then they are effectively a single key. 1810 // Any split of keys separated by the sort select size 1811 // will be finished on the next iteration. 1812 final int k1 = keys.left(); 1813 final int kn = keys.right(); 1814 if (kn - k1 < DP_SORTSELECT_SIZE) { 1815 quickSelectAdaptive(a, left, right, k1, kn, new int[1], MODE_FR_SAMPLING); 1816 } else { 1817 // Dual-pivot mode with small range sort length configured using index density 1818 dualPivotQuickSelect(a, left, right, keys, dualPivotFlags(left, right, k1, kn)); 1819 } 1820 } 1821 1822 /** 1823 * Partition the array such that indices {@code k} correspond to their 1824 * correctly sorted value in the equivalent fully sorted array. 1825 * 1826 * <p>For all indices {@code [ka, kb]} and any index {@code i}: 1827 * 1828 * <pre>{@code 1829 * data[i < ka] <= data[ka] <= data[kb] <= data[kb < i] 1830 * }</pre> 1831 * 1832 * <p>This function accepts indices {@code [ka, kb]} that define the 1833 * range of indices to partition. It is expected that the range is small. 1834 * 1835 * <p>The {@code flags} are used to control the sampling mode and adaption of 1836 * the index within the sample. 1837 * 1838 * <p>Returns the bounds containing {@code [ka, kb]}. These may be lower/higher 1839 * than the keys if equal values are present in the data. 1840 * 1841 * @param a Values. 1842 * @param left Lower bound of data (inclusive, assumed to be strictly positive). 1843 * @param right Upper bound of data (inclusive, assumed to be strictly positive). 1844 * @param ka First key of interest. 1845 * @param kb Last key of interest. 1846 * @param bounds Upper bound of the range containing {@code [ka, kb]} (inclusive). 1847 * @param flags Adaption flags. 1848 * @return Lower bound of the range containing {@code [ka, kb]} (inclusive). 1849 */ 1850 static int quickSelectAdaptive(int[] a, int left, int right, int ka, int kb, 1851 int[] bounds, int flags) { 1852 int l = left; 1853 int r = right; 1854 int m = flags; 1855 while (true) { 1856 // Select when ka and kb are close to the same end 1857 // |l|-----|ka|kkkkkkkk|kb|------|r| 1858 if (Math.min(kb - l, r - ka) < LINEAR_SORTSELECT_SIZE) { 1859 sortSelect(a, l, r, ka, kb); 1860 bounds[0] = kb; 1861 return ka; 1862 } 1863 1864 // Only target ka; kb is assumed to be close 1865 int p0; 1866 final int n = r - l; 1867 // f in [0, 1] 1868 final double f = (double) (ka - l) / n; 1869 // Record the larger margin (start at 1/4) to create the estimated size. 1870 // step L R 1871 // far left 1/12 1/3 (use 1/4 + 1/32 + 1/64 ~ 0.328) 1872 // left 1/6 1/4 1873 // middle 2/9 2/9 (use 1/4 - 1/32 ~ 0.219) 1874 int margin = n >> 2; 1875 if (m < MODE_SAMPLING && r - l > FR_SAMPLING_SIZE) { 1876 // Floyd-Rivest sample step uses the same margins 1877 p0 = sampleStep(a, l, r, ka, bounds); 1878 if (f <= STEP_FAR_LEFT || f >= STEP_FAR_RIGHT) { 1879 margin += (n >> 5) + (n >> 6); 1880 } else if (f > STEP_LEFT && f < STEP_RIGHT) { 1881 margin -= n >> 5; 1882 } 1883 } else if (f <= STEP_LEFT) { 1884 if (f <= STEP_FAR_LEFT) { 1885 margin += (n >> 5) + (n >> 6); 1886 p0 = repeatedStepFarLeft(a, l, r, ka, bounds, m); 1887 } else { 1888 p0 = repeatedStepLeft(a, l, r, ka, bounds, m); 1889 } 1890 } else if (f >= STEP_RIGHT) { 1891 if (f >= STEP_FAR_RIGHT) { 1892 margin += (n >> 5) + (n >> 6); 1893 p0 = repeatedStepFarRight(a, l, r, ka, bounds, m); 1894 } else { 1895 p0 = repeatedStepRight(a, l, r, ka, bounds, m); 1896 } 1897 } else { 1898 margin -= n >> 5; 1899 p0 = repeatedStep(a, l, r, ka, bounds, m); 1900 } 1901 1902 // Note: Here we expect [ka, kb] to be small and splitting is unlikely. 1903 // p0 p1 1904 // |l|--|ka|kkkk|kb|--|P|-------------------|r| 1905 // |l|----------------|P|--|ka|kkk|kb|------|r| 1906 // |l|-----------|ka|k|P|k|kb|--------------|r| 1907 final int p1 = bounds[0]; 1908 if (kb < p0) { 1909 // Entirely on left side 1910 r = p0 - 1; 1911 } else if (ka > p1) { 1912 // Entirely on right side 1913 l = p1 + 1; 1914 } else { 1915 // Pivot splits [ka, kb]. Expect ends to be close to the pivot and finish. 1916 // Here we set the bounds for use after median-of-medians pivot selection. 1917 // In the event there are many equal values this allows collecting those 1918 // known to be equal together when moving around the medians sample. 1919 if (kb > p1) { 1920 sortSelectLeft(a, p1 + 1, r, kb); 1921 bounds[0] = kb; 1922 } 1923 if (ka < p0) { 1924 sortSelectRight(a, l, p0 - 1, ka); 1925 p0 = ka; 1926 } 1927 return p0; 1928 } 1929 // Update mode based on target partition size 1930 if (r - l > n - margin) { 1931 m++; 1932 } 1933 } 1934 } 1935 1936 /** 1937 * Partition an array slice around a pivot. Partitioning exchanges array elements such 1938 * that all elements smaller than pivot are before it and all elements larger than 1939 * pivot are after it. 1940 * 1941 * <p>Partitions a Floyd-Rivest sample around a pivot offset so that the input {@code k} will 1942 * fall in the smaller partition when the entire range is partitioned. 1943 * 1944 * <p>Assumes the range {@code r - l} is large. 1945 * 1946 * @param a Data array. 1947 * @param l Lower bound (inclusive). 1948 * @param r Upper bound (inclusive). 1949 * @param k Target index. 1950 * @param upper Upper bound (inclusive) of the pivot range. 1951 * @return Lower bound (inclusive) of the pivot range. 1952 */ 1953 private static int sampleStep(int[] a, int l, int r, int k, int[] upper) { 1954 // Floyd-Rivest: use SELECT recursively on a sample of size S to get an estimate 1955 // for the (k-l+1)-th smallest element into a[k], biased slightly so that the 1956 // (k-l+1)-th element is expected to lie in the smaller set after partitioning. 1957 final int n = r - l + 1; 1958 final int ith = k - l + 1; 1959 final double z = Math.log(n); 1960 // sample size = 0.5 * n^(2/3) 1961 final double s = 0.5 * Math.exp(0.6666666666666666 * z); 1962 final double sd = 0.5 * Math.sqrt(z * s * (n - s) / n) * Integer.signum(ith - (n >> 1)); 1963 final int ll = Math.max(l, (int) (k - ith * s / n + sd)); 1964 final int rr = Math.min(r, (int) (k + (n - ith) * s / n + sd)); 1965 // Sample recursion restarts from [ll, rr] 1966 final int p = quickSelectAdaptive(a, ll, rr, k, k, upper, MODE_FR_SAMPLING); 1967 return expandPartition(a, l, r, ll, rr, p, upper[0], upper); 1968 } 1969 1970 /** 1971 * Partition an array slice around a pivot. Partitioning exchanges array elements such 1972 * that all elements smaller than pivot are before it and all elements larger than 1973 * pivot are after it. 1974 * 1975 * <p>Assumes the range {@code r - l >= 8}; the caller is responsible for selection on a smaller 1976 * range. If using a 12th-tile for sampling then assumes {@code r - l >= 11}. 1977 * 1978 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm 1979 * with the median of 3 then median of 3; the final sample is placed in the 1980 * 5th 9th-tile; the pivot chosen from the sample is adaptive using the input {@code k}. 1981 * 1982 * <p>Given a pivot in the middle of the sample this has margins of 2/9 and 2/9. 1983 * 1984 * @param a Data array. 1985 * @param l Lower bound (inclusive). 1986 * @param r Upper bound (inclusive). 1987 * @param k Target index. 1988 * @param upper Upper bound (inclusive) of the pivot range. 1989 * @param flags Control flags. 1990 * @return Lower bound (inclusive) of the pivot range. 1991 */ 1992 private static int repeatedStep(int[] a, int l, int r, int k, int[] upper, int flags) { 1993 // Adapted from Alexandrescu (2016), algorithm 8. 1994 final int fp; 1995 final int s; 1996 int p; 1997 if (flags <= MODE_SAMPLING) { 1998 // Median into a 12th-tile 1999 fp = (r - l + 1) / 12; 2000 // Position the sample around the target k 2001 s = k - mapDistance(k - l, l, r, fp); 2002 p = k; 2003 } else { 2004 // i in tertile [3f':6f') 2005 fp = (r - l + 1) / 9; 2006 final int f3 = 3 * fp; 2007 final int end = l + (f3 << 1); 2008 for (int i = l + f3; i < end; i++) { 2009 Sorting.sort3(a, i - f3, i, i + f3); 2010 } 2011 // 5th 9th-tile: [4f':5f') 2012 s = l + (fp << 2); 2013 // No adaption uses the middle to enforce strict margins 2014 p = s + (flags == MODE_ADAPTION ? mapDistance(k - l, l, r, fp) : (fp >>> 1)); 2015 } 2016 final int e = s + fp - 1; 2017 for (int i = s; i <= e; i++) { 2018 Sorting.sort3(a, i - fp, i, i + fp); 2019 } 2020 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING); 2021 return expandPartition(a, l, r, s, e, p, upper[0], upper); 2022 } 2023 2024 /** 2025 * Partition an array slice around a pivot. Partitioning exchanges array elements such 2026 * that all elements smaller than pivot are before it and all elements larger than 2027 * pivot are after it. 2028 * 2029 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller 2030 * range. 2031 * 2032 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm 2033 * with the lower median of 4 then either median of 3 with the final sample placed in the 2034 * 5th 12th-tile, or min of 3 with the final sample in the 4th 12th-tile; 2035 * the pivot chosen from the sample is adaptive using the input {@code k}. 2036 * 2037 * <p>Given a pivot in the middle of the sample this has margins of 1/6 and 1/4. 2038 * 2039 * @param a Data array. 2040 * @param l Lower bound (inclusive). 2041 * @param r Upper bound (inclusive). 2042 * @param k Target index. 2043 * @param upper Upper bound (inclusive) of the pivot range. 2044 * @param flags Control flags. 2045 * @return Lower bound (inclusive) of the pivot range. 2046 */ 2047 private static int repeatedStepLeft(int[] a, int l, int r, int k, int[] upper, int flags) { 2048 // Adapted from Alexandrescu (2016), algorithm 9. 2049 final int fp; 2050 final int s; 2051 int p; 2052 if (flags <= MODE_SAMPLING) { 2053 // Median into a 12th-tile 2054 fp = (r - l + 1) / 12; 2055 // Position the sample around the target k 2056 // Avoid bounds error due to rounding as (k-l)/(r-l) -> 1/12 2057 s = Math.max(k - mapDistance(k - l, l, r, fp), l + fp); 2058 p = k; 2059 } else { 2060 // i in 2nd quartile 2061 final int f = (r - l + 1) >> 2; 2062 final int f2 = f + f; 2063 final int end = l + f2; 2064 for (int i = l + f; i < end; i++) { 2065 Sorting.lowerMedian4(a, i - f, i, i + f, i + f2); 2066 } 2067 // i in 5th 12th-tile 2068 fp = f / 3; 2069 s = l + f + fp; 2070 // No adaption uses the middle to enforce strict margins 2071 p = s + (flags == MODE_ADAPTION ? mapDistance(k - l, l, r, fp) : (fp >>> 1)); 2072 } 2073 final int e = s + fp - 1; 2074 for (int i = s; i <= e; i++) { 2075 Sorting.sort3(a, i - fp, i, i + fp); 2076 } 2077 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING); 2078 return expandPartition(a, l, r, s, e, p, upper[0], upper); 2079 } 2080 2081 /** 2082 * Partition an array slice around a pivot. Partitioning exchanges array elements such 2083 * that all elements smaller than pivot are before it and all elements larger than 2084 * pivot are after it. 2085 * 2086 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller 2087 * range. 2088 * 2089 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm 2090 * with the upper median of 4 then either median of 3 with the final sample placed in the 2091 * 8th 12th-tile, or max of 3 with the final sample in the 9th 12th-tile; 2092 * the pivot chosen from the sample is adaptive using the input {@code k}. 2093 * 2094 * <p>Given a pivot in the middle of the sample this has margins of 1/4 and 1/6. 2095 * 2096 * @param a Data array. 2097 * @param l Lower bound (inclusive). 2098 * @param r Upper bound (inclusive). 2099 * @param k Target index. 2100 * @param upper Upper bound (inclusive) of the pivot range. 2101 * @param flags Control flags. 2102 * @return Lower bound (inclusive) of the pivot range. 2103 */ 2104 private static int repeatedStepRight(int[] a, int l, int r, int k, int[] upper, int flags) { 2105 // Mirror image repeatedStepLeft using upper median into 3rd quartile 2106 final int fp; 2107 final int e; 2108 int p; 2109 if (flags <= MODE_SAMPLING) { 2110 // Median into a 12th-tile 2111 fp = (r - l + 1) / 12; 2112 // Position the sample around the target k 2113 // Avoid bounds error due to rounding as (r-k)/(r-l) -> 11/12 2114 e = Math.min(k + mapDistance(r - k, l, r, fp), r - fp); 2115 p = k; 2116 } else { 2117 // i in 3rd quartile 2118 final int f = (r - l + 1) >> 2; 2119 final int f2 = f + f; 2120 final int end = r - f2; 2121 for (int i = r - f; i > end; i--) { 2122 Sorting.upperMedian4(a, i - f2, i - f, i, i + f); 2123 } 2124 // i in 8th 12th-tile 2125 fp = f / 3; 2126 e = r - f - fp; 2127 // No adaption uses the middle to enforce strict margins 2128 p = e - (flags == MODE_ADAPTION ? mapDistance(r - k, l, r, fp) : (fp >>> 1)); 2129 } 2130 final int s = e - fp + 1; 2131 for (int i = s; i <= e; i++) { 2132 Sorting.sort3(a, i - fp, i, i + fp); 2133 } 2134 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING); 2135 return expandPartition(a, l, r, s, e, p, upper[0], upper); 2136 } 2137 2138 /** 2139 * Partition an array slice around a pivot. Partitioning exchanges array elements such 2140 * that all elements smaller than pivot are before it and all elements larger than 2141 * pivot are after it. 2142 * 2143 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller 2144 * range. 2145 * 2146 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm 2147 * with the minimum of 4 then median of 3; the final sample is placed in the 2148 * 2nd 12th-tile; the pivot chosen from the sample is adaptive using the input {@code k}. 2149 * 2150 * <p>Given a pivot in the middle of the sample this has margins of 1/12 and 1/3. 2151 * 2152 * @param a Data array. 2153 * @param l Lower bound (inclusive). 2154 * @param r Upper bound (inclusive). 2155 * @param k Target index. 2156 * @param upper Upper bound (inclusive) of the pivot range. 2157 * @param flags Control flags. 2158 * @return Lower bound (inclusive) of the pivot range. 2159 */ 2160 private static int repeatedStepFarLeft(int[] a, int l, int r, int k, int[] upper, int flags) { 2161 // Far step has been changed from the Alexandrescu (2016) step of lower-median-of-4, min-of-3 2162 // into the 4th 12th-tile to a min-of-4, median-of-3 into the 2nd 12th-tile. 2163 // The differences are: 2164 // - The upper margin when not sampling is 8/24 vs. 9/24; the lower margin remains at 1/12. 2165 // - The position of the sample is closer to the expected location of k < |A| / 12. 2166 // - Sampling mode uses a median-of-3 with adaptive k, matching the other step methods. 2167 // A min-of-3 sample can create a pivot too small if used with adaption of k leaving 2168 // k in the larger parition and a wasted iteration. 2169 // - Adaption is adjusted to force use of the lower margin when not sampling. 2170 final int fp; 2171 final int s; 2172 int p; 2173 if (flags <= MODE_SAMPLING) { 2174 // 2nd 12th-tile 2175 fp = (r - l + 1) / 12; 2176 s = l + fp; 2177 // Use adaption 2178 p = s + mapDistance(k - l, l, r, fp); 2179 } else { 2180 // i in 2nd quartile; min into i-f (1st quartile) 2181 final int f = (r - l + 1) >> 2; 2182 final int f2 = f + f; 2183 final int end = l + f2; 2184 for (int i = l + f; i < end; i++) { 2185 if (a[i + f] < a[i - f]) { 2186 final int u = a[i + f]; 2187 a[i + f] = a[i - f]; 2188 a[i - f] = u; 2189 } 2190 if (a[i + f2] < a[i]) { 2191 final int v = a[i + f2]; 2192 a[i + f2] = a[i]; 2193 a[i] = v; 2194 } 2195 if (a[i] < a[i - f]) { 2196 final int u = a[i]; 2197 a[i] = a[i - f]; 2198 a[i - f] = u; 2199 } 2200 } 2201 // 2nd 12th-tile 2202 fp = f / 3; 2203 s = l + fp; 2204 // Lower margin has 2(d+1) elements; d == (position in sample) - s 2205 // Force k into the lower margin 2206 p = s + ((k - l) >>> 1); 2207 } 2208 final int e = s + fp - 1; 2209 for (int i = s; i <= e; i++) { 2210 Sorting.sort3(a, i - fp, i, i + fp); 2211 } 2212 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING); 2213 return expandPartition(a, l, r, s, e, p, upper[0], upper); 2214 } 2215 2216 /** 2217 * Partition an array slice around a pivot. Partitioning exchanges array elements such 2218 * that all elements smaller than pivot are before it and all elements larger than 2219 * pivot are after it. 2220 * 2221 * <p>Assumes the range {@code r - l >= 11}; the caller is responsible for selection on a smaller 2222 * range. 2223 * 2224 * <p>Uses the Chen and Dumitrescu repeated step median-of-medians-of-medians algorithm 2225 * with the maximum of 4 then median of 3; the final sample is placed in the 2226 * 11th 12th-tile; the pivot chosen from the sample is adaptive using the input {@code k}. 2227 * 2228 * <p>Given a pivot in the middle of the sample this has margins of 1/3 and 1/12. 2229 * 2230 * @param a Data array. 2231 * @param l Lower bound (inclusive). 2232 * @param r Upper bound (inclusive). 2233 * @param k Target index. 2234 * @param upper Upper bound (inclusive) of the pivot range. 2235 * @param flags Control flags. 2236 * @return Lower bound (inclusive) of the pivot range. 2237 */ 2238 private static int repeatedStepFarRight(int[] a, int l, int r, int k, int[] upper, int flags) { 2239 // Mirror image repeatedStepFarLeft 2240 final int fp; 2241 final int e; 2242 int p; 2243 if (flags <= MODE_SAMPLING) { 2244 // 11th 12th-tile 2245 fp = (r - l + 1) / 12; 2246 e = r - fp; 2247 // Use adaption 2248 p = e - mapDistance(r - k, l, r, fp); 2249 } else { 2250 // i in 3rd quartile; max into i+f (4th quartile) 2251 final int f = (r - l + 1) >> 2; 2252 final int f2 = f + f; 2253 final int end = r - f2; 2254 for (int i = r - f; i > end; i--) { 2255 if (a[i - f] > a[i + f]) { 2256 final int u = a[i - f]; 2257 a[i - f] = a[i + f]; 2258 a[i + f] = u; 2259 } 2260 if (a[i - f2] > a[i]) { 2261 final int v = a[i - f2]; 2262 a[i - f2] = a[i]; 2263 a[i] = v; 2264 } 2265 if (a[i] > a[i + f]) { 2266 final int u = a[i]; 2267 a[i] = a[i + f]; 2268 a[i + f] = u; 2269 } 2270 } 2271 // 11th 12th-tile 2272 fp = f / 3; 2273 e = r - fp; 2274 // Upper margin has 2(d+1) elements; d == e - (position in sample) 2275 // Force k into the upper margin 2276 p = e - ((r - k) >>> 1); 2277 } 2278 final int s = e - fp + 1; 2279 for (int i = s; i <= e; i++) { 2280 Sorting.sort3(a, i - fp, i, i + fp); 2281 } 2282 p = quickSelectAdaptive(a, s, e, p, p, upper, MODE_FR_SAMPLING); 2283 return expandPartition(a, l, r, s, e, p, upper[0], upper); 2284 } 2285 2286 /** 2287 * Expand a partition around a single pivot. Partitioning exchanges array 2288 * elements such that all elements smaller than pivot are before it and all 2289 * elements larger than pivot are after it. The central region is already 2290 * partitioned. 2291 * 2292 * <pre>{@code 2293 * |l |s |p0 p1| e| r| 2294 * | ??? | <P | ==P | >P | ??? | 2295 * }</pre> 2296 * 2297 * <p>This requires that {@code start != end}. However it handles 2298 * {@code left == start} and/or {@code end == right}. 2299 * 2300 * @param a Data array. 2301 * @param left Lower bound (inclusive). 2302 * @param right Upper bound (inclusive). 2303 * @param start Start of the partition range (inclusive). 2304 * @param end End of the partitioned range (inclusive). 2305 * @param pivot0 Lower pivot location (inclusive). 2306 * @param pivot1 Upper pivot location (inclusive). 2307 * @param upper Upper bound (inclusive) of the pivot range [k1]. 2308 * @return Lower bound (inclusive) of the pivot range [k0]. 2309 */ 2310 // package-private for testing 2311 static int expandPartition(int[] a, int left, int right, int start, int end, 2312 int pivot0, int pivot1, int[] upper) { 2313 // 3-way partition of the data using a pivot value into 2314 // less-than, equal or greater-than. 2315 // Based on Sedgewick's Bentley-McIroy partitioning: always swap i<->j then 2316 // check for equal to the pivot and move again. 2317 // 2318 // Move sentinels from start and end to left and right. Scan towards the 2319 // sentinels until >=,<=. Swap then move == to the pivot region. 2320 // <-i j-> 2321 // |l | | |p0 p1| | | r| 2322 // |>=| ??? | < | == | > | ??? |<=| 2323 // 2324 // When either i or j reach the edge perform finishing loop. 2325 // Finish loop for a[j] <= v replaces j with p1+1, optionally moves value 2326 // to p0 for < and updates the pivot range p1 (and optionally p0): 2327 // j-> 2328 // |l |p0 p1| | | r| 2329 // | < | == | > | ??? |<=| 2330 2331 final int v = a[pivot0]; 2332 // Use start/end as sentinels (requires start != end) 2333 int vi = a[start]; 2334 int vj = a[end]; 2335 a[start] = a[left]; 2336 a[end] = a[right]; 2337 a[left] = vj; 2338 a[right] = vi; 2339 2340 int i = start + 1; 2341 int j = end - 1; 2342 2343 // Positioned for pre-in/decrement to write to pivot region 2344 int p0 = pivot0 == start ? i : pivot0; 2345 int p1 = pivot1 == end ? j : pivot1; 2346 2347 while (true) { 2348 do { 2349 --i; 2350 } while (a[i] < v); 2351 do { 2352 ++j; 2353 } while (a[j] > v); 2354 vj = a[i]; 2355 vi = a[j]; 2356 a[i] = vi; 2357 a[j] = vj; 2358 // Move the equal values to pivot region 2359 if (vi == v) { 2360 a[i] = a[--p0]; 2361 a[p0] = v; 2362 } 2363 if (vj == v) { 2364 a[j] = a[++p1]; 2365 a[p1] = v; 2366 } 2367 // Termination check and finishing loops. 2368 // Note: This works even if pivot region is zero length (p1 == p0-1 due to 2369 // length 1 pivot region at either start/end) because we pre-inc/decrement 2370 // one side and post-inc/decrement the other side. 2371 if (i == left) { 2372 while (j < right) { 2373 do { 2374 ++j; 2375 } while (a[j] > v); 2376 final int w = a[j]; 2377 // Move upper bound of pivot region 2378 a[j] = a[++p1]; 2379 a[p1] = v; 2380 // Move lower bound of pivot region 2381 if (w != v) { 2382 a[p0] = w; 2383 p0++; 2384 } 2385 } 2386 break; 2387 } 2388 if (j == right) { 2389 while (i > left) { 2390 do { 2391 --i; 2392 } while (a[i] < v); 2393 final int w = a[i]; 2394 // Move lower bound of pivot region 2395 a[i] = a[--p0]; 2396 a[p0] = v; 2397 // Move upper bound of pivot region 2398 if (w != v) { 2399 a[p1] = w; 2400 p1--; 2401 } 2402 } 2403 break; 2404 } 2405 } 2406 2407 upper[0] = p1; 2408 return p0; 2409 } 2410 2411 /** 2412 * Partition the array such that indices {@code k} correspond to their 2413 * correctly sorted value in the equivalent fully sorted array. 2414 * 2415 * <p>For all indices {@code k} and any index {@code i}: 2416 * 2417 * <pre>{@code 2418 * data[i < k] <= data[k] <= data[k < i] 2419 * }</pre> 2420 * 2421 * <p>This function accepts a {@link UpdatingInterval} of indices {@code k} that define the 2422 * range of indices to partition. The {@link UpdatingInterval} can be narrowed or split as 2423 * partitioning divides the range. 2424 * 2425 * <p>Uses an introselect variant. The quickselect is a dual-pivot quicksort 2426 * partition method by Vladimir Yaroslavskiy; the fall-back on poor convergence of 2427 * the quickselect is a heapselect. 2428 * 2429 * <p>The {@code flags} contain the the current recursion count and the configured 2430 * length threshold for {@code r - l} to perform sort select. The count is in the upper 2431 * bits and the threshold is in the lower bits. 2432 * 2433 * @param a Values. 2434 * @param left Lower bound of data (inclusive, assumed to be strictly positive). 2435 * @param right Upper bound of data (inclusive, assumed to be strictly positive). 2436 * @param k Interval of indices to partition (ordered). 2437 * @param flags Control flags. 2438 */ 2439 // package-private for testing 2440 static void dualPivotQuickSelect(int[] a, int left, int right, UpdatingInterval k, int flags) { 2441 // If partitioning splits the interval then recursion is used for the left-most side(s) 2442 // and the right-most side remains within this function. If partitioning does 2443 // not split the interval then it remains within this function. 2444 int l = left; 2445 int r = right; 2446 int f = flags; 2447 int ka = k.left(); 2448 int kb = k.right(); 2449 final int[] upper = {0, 0, 0}; 2450 while (true) { 2451 // Select when ka and kb are close to the same end, 2452 // or the entire range is small 2453 // |l|-----|ka|--------|kb|------|r| 2454 final int n = r - l; 2455 if (Math.min(kb - l, r - ka) < DP_SORTSELECT_SIZE || 2456 n < (f & SORTSELECT_MASK)) { 2457 sortSelect(a, l, r, ka, kb); 2458 return; 2459 } 2460 if (kb - ka < DP_SORTSELECT_SIZE) { 2461 // Switch to single-pivot mode with Floyd-Rivest sub-sampling 2462 quickSelectAdaptive(a, l, r, ka, kb, upper, MODE_FR_SAMPLING); 2463 return; 2464 } 2465 if (f < 0) { 2466 // Excess recursion, switch to heap select 2467 heapSelect(a, l, r, ka, kb); 2468 return; 2469 } 2470 2471 // Dual-pivot partitioning 2472 final int p0 = partition(a, l, r, upper); 2473 final int p1 = upper[0]; 2474 2475 // Recursion to max depth 2476 // Note: Here we possibly branch left, middle and right with multiple keys. 2477 // It is possible that the partition has split the keys 2478 // and the recursion proceeds with a reduced set in each region. 2479 // p0 p1 p2 p3 2480 // |l|--|ka|--k----k--|P|------k--|kb|----|P|----|r| 2481 // kb | ka 2482 f += RECURSION_INCREMENT; 2483 // Recurse left side if required 2484 if (ka < p0) { 2485 if (kb <= p1) { 2486 // Entirely on left side 2487 r = p0 - 1; 2488 if (r < kb) { 2489 kb = k.updateRight(r); 2490 } 2491 continue; 2492 } 2493 dualPivotQuickSelect(a, l, p0 - 1, k.splitLeft(p0, p1), f); 2494 // Here we must process middle and/or right 2495 ka = k.left(); 2496 } else if (kb <= p1) { 2497 // No middle/right side 2498 return; 2499 } else if (ka <= p1) { 2500 // Advance lower bound 2501 ka = k.updateLeft(p1 + 1); 2502 } 2503 // Recurse middle if required 2504 final int p2 = upper[1]; 2505 final int p3 = upper[2]; 2506 if (ka < p2) { 2507 l = p1 + 1; 2508 if (kb <= p3) { 2509 // Entirely in middle 2510 r = p2 - 1; 2511 if (r < kb) { 2512 kb = k.updateRight(r); 2513 } 2514 continue; 2515 } 2516 dualPivotQuickSelect(a, l, p2 - 1, k.splitLeft(p2, p3), f); 2517 ka = k.left(); 2518 } else if (kb <= p3) { 2519 // No right side 2520 return; 2521 } else if (ka <= p3) { 2522 ka = k.updateLeft(p3 + 1); 2523 } 2524 // Continue right 2525 l = p3 + 1; 2526 } 2527 } 2528 2529 /** 2530 * Partition an array slice around 2 pivots. Partitioning exchanges array elements 2531 * such that all elements smaller than pivot are before it and all elements larger 2532 * than pivot are after it. 2533 * 2534 * <p>This method returns 4 points describing the pivot ranges of equal values. 2535 * 2536 * <pre>{@code 2537 * |k0 k1| |k2 k3| 2538 * | <P | ==P1 | <P1 && <P2 | ==P2 | >P | 2539 * }</pre> 2540 * 2541 * <ul> 2542 * <li>k0: lower pivot1 point 2543 * <li>k1: upper pivot1 point (inclusive) 2544 * <li>k2: lower pivot2 point 2545 * <li>k3: upper pivot2 point (inclusive) 2546 * </ul> 2547 * 2548 * <p>Bounds are set so {@code i < k0}, {@code i > k3} and {@code k1 < i < k2} are 2549 * unsorted. When the range {@code [k0, k3]} contains fully sorted elements the result 2550 * is set to {@code k1 = k3; k2 == k0}. This can occur if 2551 * {@code P1 == P2} or there are zero or one value between the pivots 2552 * {@code P1 < v < P2}. Any sort/partition of ranges [left, k0-1], [k1+1, k2-1] and 2553 * [k3+1, right] must check the length is {@code > 1}. 2554 * 2555 * @param a Data array. 2556 * @param left Lower bound (inclusive). 2557 * @param right Upper bound (inclusive). 2558 * @param bounds Points [k1, k2, k3]. 2559 * @return Lower bound (inclusive) of the pivot range [k0]. 2560 */ 2561 private static int partition(int[] a, int left, int right, int[] bounds) { 2562 // Pick 2 pivots from 5 approximately uniform through the range. 2563 // Spacing is ~ 1/7 made using shifts. Other strategies are equal or much 2564 // worse. 1/7 = 5/35 ~ 1/8 + 1/64 : 0.1429 ~ 0.1406 2565 // Ensure the value is above zero to choose different points! 2566 final int n = right - left; 2567 final int step = 1 + (n >>> 3) + (n >>> 6); 2568 final int i3 = left + (n >>> 1); 2569 final int i2 = i3 - step; 2570 final int i1 = i2 - step; 2571 final int i4 = i3 + step; 2572 final int i5 = i4 + step; 2573 Sorting.sort5(a, i1, i2, i3, i4, i5); 2574 2575 // Partition data using pivots P1 and P2 into less-than, greater-than or between. 2576 // Pivot values P1 & P2 are placed at the end. If P1 < P2, P2 acts as a sentinel. 2577 // k traverses the unknown region ??? and values moved if less-than or 2578 // greater-than: 2579 // 2580 // left less k great right 2581 // |P1| <P1 | P1 <= & <= P2 | ??? | >P2 |P2| 2582 // 2583 // <P1 (left, lt) 2584 // P1 <= & <= P2 [lt, k) 2585 // >P2 (gt, right) 2586 // 2587 // At the end pivots are swapped back to behind the less and great pointers. 2588 // 2589 // | <P1 |P1| P1<= & <= P2 |P2| >P2 | 2590 2591 // Swap ends to the pivot locations. 2592 final int v1 = a[i2]; 2593 a[i2] = a[left]; 2594 a[left] = v1; 2595 final int v2 = a[i4]; 2596 a[i4] = a[right]; 2597 a[right] = v2; 2598 2599 // pointers 2600 int less = left; 2601 int great = right; 2602 2603 // Fast-forward ascending / descending runs to reduce swaps. 2604 // Cannot overrun as end pivots (v1 <= v2) act as sentinels. 2605 do { 2606 ++less; 2607 } while (a[less] < v1); 2608 do { 2609 --great; 2610 } while (a[great] > v2); 2611 2612 // a[less - 1] < P1 : a[great + 1] > P2 2613 // unvisited in [less, great] 2614 SORTING: 2615 for (int k = less - 1; ++k <= great;) { 2616 final int v = a[k]; 2617 if (v < v1) { 2618 // swap(a, k, less++) 2619 a[k] = a[less]; 2620 a[less] = v; 2621 less++; 2622 } else if (v > v2) { 2623 // while k < great and a[great] > v2: 2624 // great-- 2625 while (a[great] > v2) { 2626 if (great-- == k) { 2627 // Done 2628 break SORTING; 2629 } 2630 } 2631 // swap(a, k, great--) 2632 // if a[k] < v1: 2633 // swap(a, k, less++) 2634 final int w = a[great]; 2635 a[great] = v; 2636 great--; 2637 // delay a[k] = w 2638 if (w < v1) { 2639 a[k] = a[less]; 2640 a[less] = w; 2641 less++; 2642 } else { 2643 a[k] = w; 2644 } 2645 } 2646 } 2647 2648 // Change to inclusive ends : a[less] < P1 : a[great] > P2 2649 less--; 2650 great++; 2651 // Move the pivots to correct locations 2652 a[left] = a[less]; 2653 a[less] = v1; 2654 a[right] = a[great]; 2655 a[great] = v2; 2656 2657 // Record the pivot locations 2658 final int lower = less; 2659 bounds[2] = great; 2660 2661 // equal elements 2662 // Original paper: If middle partition is bigger than a threshold 2663 // then check for equal elements. 2664 2665 // Note: This is extra work. When performing partitioning the region of interest 2666 // may be entirely above or below the central region and this can be skipped. 2667 2668 // Here we look for equal elements if the centre is more than 5/8 the length. 2669 // 5/8 = 1/2 + 1/8. Pivots must be different. 2670 if ((great - less) > (n >>> 1) + (n >>> 3) && v1 != v2) { 2671 2672 // Fast-forward to reduce swaps. Changes inclusive ends to exclusive ends. 2673 // Since v1 != v2 these act as sentinels to prevent overrun. 2674 do { 2675 ++less; 2676 } while (a[less] == v1); 2677 do { 2678 --great; 2679 } while (a[great] == v2); 2680 2681 // This copies the logic in the sorting loop using == comparisons 2682 EQUAL: 2683 for (int k = less - 1; ++k <= great;) { 2684 final int v = a[k]; 2685 if (v == v1) { 2686 a[k] = a[less]; 2687 a[less] = v; 2688 less++; 2689 } else if (v == v2) { 2690 while (a[great] == v2) { 2691 if (great-- == k) { 2692 // Done 2693 break EQUAL; 2694 } 2695 } 2696 final int w = a[great]; 2697 a[great] = v; 2698 great--; 2699 if (w == v1) { 2700 a[k] = a[less]; 2701 a[less] = w; 2702 less++; 2703 } else { 2704 a[k] = w; 2705 } 2706 } 2707 } 2708 2709 // Change to inclusive ends 2710 less--; 2711 great++; 2712 } 2713 2714 // Between pivots in (less, great) 2715 if (v1 != v2 && less < great - 1) { 2716 // Record the pivot end points 2717 bounds[0] = less; 2718 bounds[1] = great; 2719 } else { 2720 // No unsorted internal region (set k1 = k3; k2 = k0) 2721 bounds[0] = bounds[2]; 2722 bounds[1] = lower; 2723 } 2724 2725 return lower; 2726 } 2727 2728 /** 2729 * Map the distance from the edge of {@code [l, r]} to a new distance in {@code [0, n)}. 2730 * 2731 * <p>The provides the adaption {@code kf'/|A|} from Alexandrescu (2016) where 2732 * {@code k == d}, {@code f' == n} and {@code |A| == r-l+1}. 2733 * 2734 * <p>For convenience this accepts the input range {@code [l, r]}. 2735 * 2736 * @param d Distance from the edge in {@code [0, r - l]}. 2737 * @param l Lower bound (inclusive). 2738 * @param r Upper bound (inclusive). 2739 * @param n Size of the new range. 2740 * @return the mapped distance in [0, n) 2741 */ 2742 private static int mapDistance(int d, int l, int r, int n) { 2743 return (int) (d * (n - 1.0) / (r - l)); 2744 } 2745 2746 /** 2747 * Configure the dual-pivot control flags. This packs the maximum recursion depth and 2748 * sort select size into a single integer. 2749 * 2750 * @param left Lower bound (inclusive). 2751 * @param right Upper bound (inclusive). 2752 * @param k1 First key of interest. 2753 * @param kn Last key of interest. 2754 * @return the flags 2755 */ 2756 private static int dualPivotFlags(int left, int right, int k1, int kn) { 2757 final int maxDepth = dualPivotMaxDepth(right - left); 2758 final int ss = dualPivotSortSelectSize(k1, kn); 2759 return dualPivotFlags(maxDepth, ss); 2760 } 2761 2762 /** 2763 * Configure the dual-pivot control flags. This packs the maximum recursion depth and 2764 * sort select size into a single integer. 2765 * 2766 * @param maxDepth Maximum recursion depth. 2767 * @param ss Sort select size. 2768 * @return the flags 2769 */ 2770 static int dualPivotFlags(int maxDepth, int ss) { 2771 // The flags are packed using the upper bits to count back from -1 in 2772 // step sizes. The lower bits pack the sort select size. 2773 int flags = Integer.MIN_VALUE - maxDepth * RECURSION_INCREMENT; 2774 flags &= ~SORTSELECT_MASK; 2775 return flags | ss; 2776 } 2777 2778 /** 2779 * Compute the maximum recursion depth for dual pivot recursion. 2780 * This is an approximation to {@code 2 * log3 (x)}. 2781 * 2782 * <p>The result is between {@code 2*floor(log3(x))} and {@code 2*ceil(log3(x))}. 2783 * The result is correctly rounded when {@code x +/- 1} is a power of 3. 2784 * 2785 * @param x Value. 2786 * @return maximum recursion depth 2787 */ 2788 static int dualPivotMaxDepth(int x) { 2789 // log3(2) ~ 1.5849625 2790 // log3(x) ~ log2(x) * 0.630929753... ~ log2(x) * 323 / 512 (0.630859375) 2791 // Use (floor(log2(x))+1) * 323 / 256 2792 return ((32 - Integer.numberOfLeadingZeros(x)) * 323) >>> 8; 2793 } 2794 2795 /** 2796 * Configure the sort select size for dual pivot partitioning. 2797 * 2798 * @param k1 First key of interest. 2799 * @param kn Last key of interest. 2800 * @return the sort select size. 2801 */ 2802 private static int dualPivotSortSelectSize(int k1, int kn) { 2803 // Configure the sort select size based on the index density 2804 // l---k1---k---k-----k--k------kn----r 2805 // 2806 // For a full sort the dual-pivot quicksort can switch to insertion sort 2807 // when the length is small. The optimum value is dependent on the 2808 // hardware and the insertion sort implementation. Benchmarks show that 2809 // insertion sort can be used at length 80-120. 2810 // 2811 // During selection the SORTSELECT_SIZE specifies the distance from the edge 2812 // to use sort select. When keys are not dense there may be a small length 2813 // that is ignored by sort select due to the presence of another key. 2814 // Diagram of k-l = SORTSELECT_SIZE and r-k < SORTSELECT_SIZE where a second 2815 // key b is blocking the use of sort select. The key b is closest it can be to the right 2816 // key to enable blocking; it could be further away (up to k = left). 2817 // 2818 // |--SORTSELECT_SIZE--| 2819 // |--SORTSELECT_SIZE--| 2820 // l--b----------------k--r 2821 // l----b--------------k----r 2822 // l------b------------k------r 2823 // l--------b----------k--------r 2824 // l----------b--------k----------r 2825 // l------------b------k------------r 2826 // l--------------b----k--------------r 2827 // l----------------b--k----------------r 2828 // l------------------bk------------------r 2829 // |--SORTSELECT_SIZE--| 2830 // 2831 // For all these cases the partitioning method would have to run. Assuming ideal 2832 // dual-pivot partitioning into thirds, and that the left key is randomly positioned 2833 // in [left, k) it is more likely that after partitioning 2 partitions will have to 2834 // be processed rather than 1 partition. In this case the options are: 2835 // - split the range using partitioning; sort select next iteration 2836 // - use sort select with a edge distance above the optimum length for single k selection 2837 // 2838 // Contrast with a longer length: 2839 // |--SORTSELECT_SIZE--| 2840 // l-------------------k-----k-------k-------------------r 2841 // |--SORTSELECT_SIZE--| 2842 // Here partitioning has to run and 1, 2, or 3 partitions processed. But all k can 2843 // be found with a sort. In this case sort select could be used with a much higher 2844 // length (e.g. 80 - 120). 2845 // 2846 // When keys are extremely sparse (never within SORTSELECT_SIZE) then no switch 2847 // to sort select based on length is *required*. It may still be beneficial to avoid 2848 // partitioning if the length is very small due to the overhead of partitioning. 2849 // 2850 // Benchmarking with different lengths for a switch to sort select show inconsistent 2851 // behaviour across platforms due to the variable speed of insertion sort at longer 2852 // lengths. Attempts to transition the length based on various ramps schemes can 2853 // be incorrect and result is a slowdown rather than speed-up (if the correct threshold 2854 // is not chosen). 2855 // 2856 // Here we use a much simpler scheme based on these observations: 2857 // - If the average separation is very high then no length will collect extra indices 2858 // from a sort select over the current trigger of using the distance from the end. But 2859 // using a length dependence will not effect the work done by sort select as it only 2860 // performs the minimum sorting required. 2861 // - If the average separation is within the SORTSELECT_SIZE then a round of 2862 // partitioning will create multiple regions that all require a sort selection. 2863 // Thus a partitioning round can be avoided if the length is small. 2864 // - If the keys are at the end with nothing in between then partitioning will be able 2865 // to split them but a sort will have to sort the entire range: 2866 // lk-------------------------------kr 2867 // After partitioning starts the chance of keys being at the ends is low as keys 2868 // should be random within the divided range. 2869 // - Extremely high density keys is rare. It is only expected to saturate the range 2870 // with short lengths, e.g. 100 quantiles for length 1000 = separation 10 (high density) 2871 // but for length 10000 = separation 100 (low density). 2872 // - The density of (non-uniform) keys is hard to predict without complex analysis. 2873 // 2874 // Benchmarking using random keys at various density show no performance loss from 2875 // using a fixed size for the length dependence of sort select, if the size is small. 2876 // A large length can impact performance with low density keys, and on machines 2877 // where insertion sort is slower. Extreme performance gains occur when the average 2878 // separation of random keys is below 8-16, or of uniform keys around 32, by using a 2879 // sort at lengths up to 90. But this threshold shows performance loss greater than 2880 // the gains with separation of 64-128 on random keys, and on machines with slow 2881 // insertion sort. The transition to using an insertion sort of a longer length 2882 // is difficult to predict for all situations. 2883 2884 // Let partitioning run if the initial length is small. 2885 // Use kn - k1 as a proxy for the length. If length is actually very large then 2886 // the final selection is insignificant. This avoids slowdown for small lengths 2887 // where the keys may only be at the ends. Note ideal dual-pivot partitioning 2888 // creates thirds so 1 iteration on SORTSELECT_SIZE * 3 should create 2889 // SORTSELECT_SIZE partitions. 2890 if (kn - k1 < DP_SORTSELECT_SIZE * 3) { 2891 return 0; 2892 } 2893 // Here partitioning will run at least once. 2894 // Stable performance across platforms using a modest length dependence. 2895 return DP_SORTSELECT_SIZE * 2; 2896 } 2897 }