LUCENE-8623: Decrease I/O pressure when merging high dimensional points
authoriverase <ivera@apache.org>
Mon, 14 Jan 2019 06:56:06 +0000 (07:56 +0100)
committeriverase <ivera@apache.org>
Mon, 14 Jan 2019 06:56:06 +0000 (07:56 +0100)
lucene/CHANGES.txt
lucene/core/src/java/org/apache/lucene/util/bkd/BKDWriter.java

index 3b98955..b570c48 100644 (file)
@@ -321,6 +321,8 @@ Optimizations
   rather than an in-place mergesort, which needs to perform fewer swaps.
   (Adrien Grand)
 
+* LUCENE-8623: Decrease I/O pressure when merging high dimensional points. (Ignacio Vera)
+
 Test Framework
 
 * LUCENE-8604: TestRuleLimitSysouts now has an optional "hard limit" of bytes that can be written
index 39d05b0..676896f 100644 (file)
@@ -762,11 +762,6 @@ public class BKDWriter implements Closeable {
   // encoding and not have our own ByteSequencesReader/Writer
 
   /** Sort the heap writer by the specified dim */
-  private void sortHeapPointWriter(final HeapPointWriter writer, int dim) {
-    final int pointCount = Math.toIntExact(this.pointCount);
-    sortHeapPointWriter(writer, pointCount, dim);
-  }
-  /** Sort the heap writer by the specified dim */
   private void sortHeapPointWriter(final HeapPointWriter writer, int pointCount, int dim) {
     // Tie-break by docID:
 
@@ -841,96 +836,131 @@ public class BKDWriter implements Closeable {
   }
   */
 
+  //return a new point writer sort by the provided dimension from input data
   private PointWriter sort(int dim) throws IOException {
     assert dim >= 0 && dim < numDataDims;
 
     if (heapPointWriter != null) {
-
       assert tempInput == null;
-
       // We never spilled the incoming points to disk, so now we sort in heap:
-      HeapPointWriter sorted;
-
-      if (dim == 0) {
-        // First dim can re-use the current heap writer
-        sorted = heapPointWriter;
-      } else {
-        // Subsequent dims need a private copy
-        sorted = new HeapPointWriter((int) pointCount, (int) pointCount, packedBytesLength, longOrds, singleValuePerDoc);
-        sorted.copyFrom(heapPointWriter);
-      }
-
+      HeapPointWriter sorted = heapPointWriter;
       //long t0 = System.nanoTime();
-      sortHeapPointWriter(sorted, dim);
+      sortHeapPointWriter(sorted, Math.toIntExact(this.pointCount), dim);
       //long t1 = System.nanoTime();
       //System.out.println("BKD: sort took " + ((t1-t0)/1000000.0) + " msec");
-
       sorted.close();
+      heapPointWriter = null;
       return sorted;
     } else {
-
       // Offline sort:
       assert tempInput != null;
+      OfflinePointWriter sorted = sortOffLine(dim, tempInput.getName(), 0, pointCount);
+      tempDir.deleteFile(tempInput.getName());
+      tempInput = null;
+      return sorted;
+    }
+  }
 
-      final int offset = bytesPerDim * dim;
+  //return a new point writer sort by the provided dimension from start to start + pointCount
+  private PointWriter sort(int dim, PointWriter writer, final long start, final long pointCount) throws IOException {
+    assert dim >= 0 && dim < numDataDims;
 
-      Comparator<BytesRef> cmp;
-      if (dim == numDataDims - 1) {
-        // in that case the bytes for the dimension and for the doc id are contiguous,
-        // so we don't need a branch
-        cmp = new BytesRefComparator(bytesPerDim + Integer.BYTES) {
-          @Override
-          protected int byteAt(BytesRef ref, int i) {
+    if (writer instanceof HeapPointWriter) {
+      HeapPointWriter heapPointWriter = createHeapPointWriterCopy((HeapPointWriter) writer, start, pointCount);
+      sortHeapPointWriter(heapPointWriter, Math.toIntExact(pointCount), dim);
+      return heapPointWriter;
+    } else {
+      OfflinePointWriter offlinePointWriter = (OfflinePointWriter) writer;
+      return sortOffLine(dim, offlinePointWriter.name, start, pointCount);
+    }
+  }
+
+  // sort a given file on a given dimension for start to start + point count
+  private OfflinePointWriter sortOffLine(int dim, String inputName, final long start, final long pointCount) throws IOException {
+
+    final int offset = bytesPerDim * dim;
+
+    Comparator<BytesRef> cmp;
+    if (dim == numDataDims - 1) {
+      // in that case the bytes for the dimension and for the doc id are contiguous,
+      // so we don't need a branch
+      cmp = new BytesRefComparator(bytesPerDim + Integer.BYTES) {
+        @Override
+        protected int byteAt(BytesRef ref, int i) {
+          return ref.bytes[ref.offset + offset + i] & 0xff;
+        }
+      };
+    } else {
+      cmp = new BytesRefComparator(bytesPerDim + Integer.BYTES) {
+        @Override
+        protected int byteAt(BytesRef ref, int i) {
+          if (i < bytesPerDim) {
             return ref.bytes[ref.offset + offset + i] & 0xff;
+          } else {
+            return ref.bytes[ref.offset + packedBytesLength + i - bytesPerDim] & 0xff;
           }
-        };
-      } else {
-        cmp = new BytesRefComparator(bytesPerDim + Integer.BYTES) {
+        }
+      };
+    }
+
+    OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix + "_bkd" + dim, cmp, offlineSorterBufferMB, offlineSorterMaxTempFiles, bytesPerDoc, null, 0) {
+      /**
+       * We write/read fixed-byte-width file that {@link OfflinePointReader} can read.
+       */
+      @Override
+      protected ByteSequencesWriter getWriter(IndexOutput out, long count) {
+        return new ByteSequencesWriter(out) {
           @Override
-          protected int byteAt(BytesRef ref, int i) {
-            if (i < bytesPerDim) {
-              return ref.bytes[ref.offset + offset + i] & 0xff;
-            } else {
-              return ref.bytes[ref.offset + packedBytesLength + i - bytesPerDim] & 0xff;
-            }
+          public void write(byte[] bytes, int off, int len) throws IOException {
+            assert len == bytesPerDoc : "len=" + len + " bytesPerDoc=" + bytesPerDoc;
+            out.writeBytes(bytes, off, len);
           }
         };
       }
 
-      OfflineSorter sorter = new OfflineSorter(tempDir, tempFileNamePrefix + "_bkd" + dim, cmp, offlineSorterBufferMB, offlineSorterMaxTempFiles, bytesPerDoc, null, 0) {
-
-          /** We write/read fixed-byte-width file that {@link OfflinePointReader} can read. */
-          @Override
-          protected ByteSequencesWriter getWriter(IndexOutput out, long count) {
-            return new ByteSequencesWriter(out) {
-              @Override
-              public void write(byte[] bytes, int off, int len) throws IOException {
-                assert len == bytesPerDoc: "len=" + len + " bytesPerDoc=" + bytesPerDoc;
-                out.writeBytes(bytes, off, len);
-              }
-            };
-          }
+      /**
+       * We write/read fixed-byte-width file that {@link OfflinePointReader} can read.
+       */
+      @Override
+      protected ByteSequencesReader getReader(ChecksumIndexInput in, String name) throws IOException {
+        //This allows to read only a subset of the original file
+        long startPointer = (name.equals(inputName)) ? bytesPerDoc * start : in.getFilePointer();
+        long endPointer = (name.equals(inputName)) ? startPointer + bytesPerDoc * pointCount : Long.MAX_VALUE;
+        in.seek(startPointer);
+        return new ByteSequencesReader(in, name) {
+          final BytesRef scratch = new BytesRef(new byte[bytesPerDoc]);
 
-          /** We write/read fixed-byte-width file that {@link OfflinePointReader} can read. */
           @Override
-          protected ByteSequencesReader getReader(ChecksumIndexInput in, String name) throws IOException {
-            return new ByteSequencesReader(in, name) {
-              final BytesRef scratch = new BytesRef(new byte[bytesPerDoc]);
-              @Override
-              public BytesRef next() throws IOException {
-                if (in.getFilePointer() >= end) {
-                  return null;
-                }
-                in.readBytes(scratch.bytes, 0, bytesPerDoc);
-                return scratch;
-              }
-            };
+          public BytesRef next() throws IOException {
+            if (in.getFilePointer() >= end) {
+              return null;
+            } else if (in.getFilePointer() >= endPointer) {
+              in.seek(end);
+              return null;
+            }
+            in.readBytes(scratch.bytes, 0, bytesPerDoc);
+            return scratch;
           }
         };
+      }
+    };
 
-      String name = sorter.sort(tempInput.getName());
+    String name = sorter.sort(inputName);
+    return new OfflinePointWriter(tempDir, name, packedBytesLength, pointCount, longOrds, singleValuePerDoc);
+  }
 
-      return new OfflinePointWriter(tempDir, name, packedBytesLength, pointCount, longOrds, singleValuePerDoc);
+  private HeapPointWriter createHeapPointWriterCopy(HeapPointWriter writer, long start, long count) throws IOException {
+    //TODO: Can we do this faster?
+    int size = Math.toIntExact(count);
+    try (HeapPointWriter pointWriter = new HeapPointWriter(size, size, packedBytesLength, longOrds, singleValuePerDoc);
+         PointReader reader = writer.getReader(start, count)) {
+     for (long i =0; i < count; i++) {
+       reader.next();
+       pointWriter.append(reader.packedValue(), reader.ord(), reader.docID());
+     }
+     return pointWriter;
+    } catch (Throwable t) {
+      throw verifyChecksum(t, writer);
     }
   }
 
@@ -994,7 +1024,7 @@ public class BKDWriter implements Closeable {
     // Make sure the math above "worked":
     assert pointCount / numLeaves <= maxPointsInLeafNode: "pointCount=" + pointCount + " numLeaves=" + numLeaves + " maxPointsInLeafNode=" + maxPointsInLeafNode;
 
-    // Sort all docs once by each dimension:
+    // Slices are created as they are needed
     PathSlice[] sortedPointWriters = new PathSlice[numIndexDims];
 
     // This is only used on exception; on normal code paths we close all files we opened:
@@ -1002,20 +1032,6 @@ public class BKDWriter implements Closeable {
 
     boolean success = false;
     try {
-      //long t0 = System.nanoTime();
-      for(int dim=0;dim<numIndexDims;dim++) {
-        sortedPointWriters[dim] = new PathSlice(sort(dim), 0, pointCount);
-      }
-      //long t1 = System.nanoTime();
-      //System.out.println("sort time: " + ((t1-t0)/1000000.0) + " msec");
-
-      if (tempInput != null) {
-        tempDir.deleteFile(tempInput.getName());
-        tempInput = null;
-      } else {
-        assert heapPointWriter != null;
-        heapPointWriter = null;
-      }
 
       final int[] parentSplits = new int[numIndexDims];
       build(1, numLeaves, sortedPointWriters,
@@ -1027,10 +1043,6 @@ public class BKDWriter implements Closeable {
             toCloseHeroically);
       assert Arrays.equals(parentSplits, new int[numIndexDims]);
 
-      for(PathSlice slice : sortedPointWriters) {
-        slice.writer.destroy();
-      }
-
       // If no exception, we should have cleaned everything up:
       assert tempDir.getCreatedFiles().isEmpty();
       //long t2 = System.nanoTime();
@@ -1443,7 +1455,7 @@ public class BKDWriter implements Closeable {
       boolean result = reader.next();
       assert result: "rightCount=" + rightCount + " source.count=" + source.count + " source.writer=" + source.writer;
       System.arraycopy(reader.packedValue(), splitDim*bytesPerDim, scratch1, 0, bytesPerDim);
-      if (numIndexDims > 1) {
+      if (numIndexDims > 1 && ordBitSet != null) {
         assert ordBitSet.get(reader.ord()) == false;
         ordBitSet.set(reader.ord());
         // Subtract 1 from rightCount because we already did the first value above (so we could record the split value):
@@ -1677,15 +1689,6 @@ public class BKDWriter implements Closeable {
                      long[] leafBlockFPs,
                      List<Closeable> toCloseHeroically) throws IOException {
 
-    for (PathSlice slice : slices) {
-      assert slice.count == slices[0].count;
-    }
-
-    if (numDataDims == 1 && slices[0].writer instanceof OfflinePointWriter && slices[0].count <= maxPointsSortInHeap) {
-      // Special case for 1D, to cutover to heap once we recurse deeply enough:
-      slices[0] = switchToHeap(slices[0], toCloseHeroically);
-    }
-
     if (nodeID >= leafNodeOffset) {
 
       // Leaf node: write block
@@ -1695,10 +1698,20 @@ public class BKDWriter implements Closeable {
       int sortedDimCardinality = Integer.MAX_VALUE;
 
       for (int dim=0;dim<numIndexDims;dim++) {
+        //create a slice if it does not exist
+        boolean created = false;
+        if (slices[dim] == null) {
+          createPathSlice(slices, dim);
+          created = true;
+        }
         if (slices[dim].writer instanceof HeapPointWriter == false) {
           // Adversarial cases can cause this, e.g. very lopsided data, all equal points, such that we started
           // offline, but then kept splitting only in one dimension, and so never had to rewrite into heap writer
+          PathSlice slice = slices[dim];
           slices[dim] = switchToHeap(slices[dim], toCloseHeroically);
+          if (created) {
+            slice.writer.destroy();
+          }
         }
 
         PathSlice source = slices[dim];
@@ -1819,6 +1832,12 @@ public class BKDWriter implements Closeable {
         splitDim = 0;
       }
 
+      //We delete the created path slices at the same level
+      boolean deleteSplitDim = false;
+      if (slices[splitDim] == null) {
+        createPathSlice(slices, splitDim);
+        deleteSplitDim = true;
+      }
       PathSlice source = slices[splitDim];
 
       assert nodeID < splitPackedValues.length: "nodeID=" + nodeID + " splitValues.length=" + splitPackedValues.length;
@@ -1827,7 +1846,16 @@ public class BKDWriter implements Closeable {
       long rightCount = source.count / 2;
       long leftCount = source.count - rightCount;
 
-      byte[] splitValue = markRightTree(rightCount, splitDim, source, ordBitSet);
+      // When we are on this dim, below, we clear the ordBitSet:
+      int dimToClear = numIndexDims - 1;
+      while (dimToClear >= 0) {
+        if (slices[dimToClear] != null && splitDim != dimToClear) {
+          break;
+        }
+        dimToClear--;
+      }
+
+      byte[] splitValue = (dimToClear == -1) ? markRightTree(rightCount, splitDim, source, null) : markRightTree(rightCount, splitDim, source, ordBitSet);
       int address = nodeID * (1+bytesPerDim);
       splitPackedValues[address] = (byte) splitDim;
       System.arraycopy(splitValue, 0, splitPackedValues, address + 1, bytesPerDim);
@@ -1843,16 +1871,11 @@ public class BKDWriter implements Closeable {
       byte[] maxSplitPackedValue = new byte[packedIndexBytesLength];
       System.arraycopy(maxPackedValue, 0, maxSplitPackedValue, 0, packedIndexBytesLength);
 
-      // When we are on this dim, below, we clear the ordBitSet:
-      int dimToClear;
-      if (numIndexDims - 1 == splitDim) {
-        dimToClear = numIndexDims - 2;
-      } else {
-        dimToClear = numIndexDims - 1;
-      }
 
       for(int dim=0;dim<numIndexDims;dim++) {
-
+        if (slices[dim] == null) {
+          continue;
+        }
         if (dim == splitDim) {
           // No need to partition on this dim since it's a simple slice of the incoming already sorted slice, and we
           // will re-use its shared reader when visiting it as we recurse:
@@ -1890,7 +1913,7 @@ public class BKDWriter implements Closeable {
             splitPackedValues, leafBlockFPs, toCloseHeroically);
       for(int dim=0;dim<numIndexDims;dim++) {
         // Don't destroy the dim we split on because we just re-used what our caller above gave us for that dim:
-        if (dim != splitDim) {
+        if (dim != splitDim && slices[dim] != null) {
           leftSlices[dim].writer.destroy();
         }
       }
@@ -1903,11 +1926,30 @@ public class BKDWriter implements Closeable {
             splitPackedValues, leafBlockFPs, toCloseHeroically);
       for(int dim=0;dim<numIndexDims;dim++) {
         // Don't destroy the dim we split on because we just re-used what our caller above gave us for that dim:
-        if (dim != splitDim) {
+        if (dim != splitDim && slices[dim] != null) {
           rightSlices[dim].writer.destroy();
         }
       }
       parentSplits[splitDim]--;
+      if (deleteSplitDim) {
+        slices[splitDim].writer.destroy();
+      }
+    }
+  }
+
+  private void createPathSlice(PathSlice[] slices, int dim) throws IOException{
+    assert slices[dim] == null;
+    PathSlice current = null;
+    for(PathSlice slice : slices) {
+      if (slice != null) {
+        current = slice;
+        break;
+      }
+    }
+    if (current == null) {
+      slices[dim] =  new PathSlice(sort(dim), 0, pointCount);
+    } else {
+      slices[dim] =  new PathSlice(sort(dim, current.writer, current.start, current.count), 0, current.count);
     }
   }