PHOENIX-5123 Avoid using MappedByteBuffers for server side GROUP BY.
authorLars Hofhansl <larsh@apache.org>
Wed, 6 Feb 2019 21:27:26 +0000 (13:27 -0800)
committerLars Hofhansl <larsh@apache.org>
Wed, 6 Feb 2019 21:27:26 +0000 (13:27 -0800)
phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillFile.java
phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillMap.java

index 51aef98..a47cfdf 100644 (file)
@@ -51,27 +51,23 @@ public class SpillFile implements Closeable {
     private Map<Integer, TempFile> tempFiles;
     // Custom spill files directory
     private File spillFilesDirectory = null;
-    
+
     // Wrapper class for a TempFile: File + RandomAccessFile
-    private static class TempFile implements Closeable{
-       private RandomAccessFile rndFile;
-       private File file;
-       
-       public TempFile(File file, RandomAccessFile rndFile) {
-               this.file = file;
-               this.rndFile = rndFile;
-       }       
-               
-       public FileChannel getChannel() {
-               return rndFile.getChannel();
-       }
+    private static class TempFile implements Closeable {
+        private final RandomAccessFile rndFile;
+        private final File file;
+
+        public TempFile(File file, RandomAccessFile rndFile) {
+            this.file = file;
+            this.rndFile = rndFile;
+        }
+
+        @Override
+        public void close() throws IOException {
+            Closeables.closeQuietly(rndFile.getChannel());
+            Closeables.closeQuietly(rndFile);
 
-               @Override
-               public void close() throws IOException {
-                       Closeables.closeQuietly(rndFile.getChannel());
-                       Closeables.closeQuietly(rndFile);
-                       
-                       if (file != null) {
+            if (file != null) {
                 if (logger.isDebugEnabled()) {
                     logger.debug("Deleting tempFile: " + file.getAbsolutePath());
                 }
@@ -79,9 +75,9 @@ public class SpillFile implements Closeable {
                     file.delete();
                 } catch (SecurityException e) {
                     logger.warn("IOException thrown while closing Closeable." + e);
-               }
+                }
             }
-               }
+        }
     }
 
     private SpillFile(File spillFilesDirectory) throws IOException {
@@ -120,29 +116,29 @@ public class SpillFile implements Closeable {
     /**
      * Random access to a page of the current spill file
      * @param index
+     * @return a file seeked to the correct page
      */
-    public MappedByteBuffer getPage(int index) {
+    public RandomAccessFile getPage(int index) {
         try {
-               TempFile tempFile = null;
-               int fileIndex = 0;
-               
-            long offset = (long) index * (long) DEFAULT_PAGE_SIZE;            
-            if(offset >= SPILL_FILE_SIZE) {
-               // Offset exceeds the first SpillFile size
-               // Get the index of the file that should contain the pageID
-               fileIndex = (int)(offset / SPILL_FILE_SIZE);
-               if(!tempFiles.containsKey(fileIndex)) {
-                       // Dynamically add new spillFiles if directory grows beyond 
-                       // max page ID.
-                       tempFile = createTempFile();
-                       tempFiles.put(fileIndex, tempFile);
-               }
-            }
-               tempFile = tempFiles.get(fileIndex);
-               // Channel gets buffered in file object
-               FileChannel fc = tempFile.getChannel();
+            TempFile tempFile = null;
+            int fileIndex = 0;
 
-               return fc.map(MapMode.READ_WRITE, offset, DEFAULT_PAGE_SIZE);
+            long offset = (long) index * (long) DEFAULT_PAGE_SIZE;
+            if (offset >= SPILL_FILE_SIZE) {
+                // Offset exceeds the first SpillFile size
+                // Get the index of the file that should contain the pageID
+                fileIndex = (int) (offset / SPILL_FILE_SIZE);
+                if (!tempFiles.containsKey(fileIndex)) {
+                    // Dynamically add new spillFiles if directory grows beyond
+                    // max page ID.
+                    tempFile = createTempFile();
+                    tempFiles.put(fileIndex, tempFile);
+                }
+            }
+            tempFile = tempFiles.get(fileIndex);
+            RandomAccessFile file = tempFile.rndFile;
+            file.seek(offset);
+            return file;
         } catch (IOException ioe) {
             // Close resource
             close();
index bb4ce2e..cff1e44 100644 (file)
@@ -19,8 +19,8 @@
 package org.apache.phoenix.cache.aggcache;
 
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.nio.BufferOverflowException;
-import java.nio.MappedByteBuffer;
 import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -55,7 +55,7 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements
     private int curMapBufferIndex;
     private SpillFile spillFile;
     // Directory of hash buckets --> extendible hashing implementation
-    private MappedByteBufferMap[] directory;
+    private FileMap[] directory;
     private final SpillableGroupByCache.QueryCache cache;
 
     public SpillMap(SpillFile file, int thresholdBytes, int estValueSize, SpillableGroupByCache.QueryCache cache)
@@ -67,11 +67,11 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements
 
         // Init the e-hashing directory structure
         globalDepth = 1;
-        directory = new MappedByteBufferMap[(1 << globalDepth)];
+        directory = new FileMap[(1 << globalDepth)];
 
         for (int i = 0; i < directory.length; i++) {
             // Create an empty bucket list
-            directory[i] = new MappedByteBufferMap(i, this.thresholdBytes, pageInserts, file);
+            directory[i] = new FileMap(i, this.thresholdBytes, pageInserts, file);
             directory[i].flushBuffer();
         }
         directory[0].pageIn();
@@ -93,7 +93,7 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements
     // for bucket splits
     private void redistribute(int index, ImmutableBytesPtr keyNew, byte[] valueNew) {
         // Get the respective bucket
-        MappedByteBufferMap byteMap = directory[index];
+        FileMap byteMap = directory[index];
 
         // Get the actual bucket index, that the directory index points to
         int mappedIdx = byteMap.pageIndex;
@@ -119,8 +119,8 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements
         int b2Index = Math.max(index, tmpIndex);
 
         // Create two new split buckets
-        MappedByteBufferMap b1 = new MappedByteBufferMap(b1Index, thresholdBytes, pageInserts, spillFile);
-        MappedByteBufferMap b2 = new MappedByteBufferMap(b2Index, thresholdBytes, pageInserts, spillFile);
+        FileMap b1 = new FileMap(b1Index, thresholdBytes, pageInserts, spillFile);
+        FileMap b2 = new FileMap(b2Index, thresholdBytes, pageInserts, spillFile);
 
         // redistribute old elements into b1 and b2
         for (Entry<ImmutableBytesPtr, byte[]> element : byteMap.pageMap.entrySet()) {
@@ -182,7 +182,7 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements
         Preconditions.checkArgument(newDirSize < Integer.MAX_VALUE);
 
         // Double it!
-        MappedByteBufferMap[] newDirectory = new MappedByteBufferMap[newDirSize];
+        FileMap[] newDirectory = new FileMap[newDirSize];
         for (int i = 0; i < directory.length; i++) {
             newDirectory[i] = directory[i];
             newDirectory[i + directory.length] = directory[i];
@@ -212,12 +212,12 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements
         byte[] value = null;
 
         int bucketIndex = getBucketIndex(ikey);
-        MappedByteBufferMap byteMap = directory[bucketIndex];
+        FileMap byteMap = directory[bucketIndex];
 
         // Decision based on bucket ID, not the directory ID due to the n:1 relationship
         if (directory[curMapBufferIndex].pageIndex != byteMap.pageIndex) {
             // map not paged in
-            MappedByteBufferMap curByteMap = directory[curMapBufferIndex];
+            FileMap curByteMap = directory[curMapBufferIndex];
 
             // Use bloomFilter to check if key was spilled before
             if (byteMap.containsKey(ikey.copyBytesIfNecessary())) {
@@ -240,10 +240,10 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements
     private byte[] getAlways(ImmutableBytesPtr key) {
         byte[] value = null;
         int bucketIndex = getBucketIndex(key);
-        MappedByteBufferMap byteMap = directory[bucketIndex];
+        FileMap byteMap = directory[bucketIndex];
 
         if (directory[curMapBufferIndex].pageIndex != byteMap.pageIndex) {
-            MappedByteBufferMap curByteMap = directory[curMapBufferIndex];
+            FileMap curByteMap = directory[curMapBufferIndex];
             // ensure consistency and flush current memory page to disk
             curByteMap.flushBuffer();
 
@@ -266,7 +266,7 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements
         // page in element and replace if present
         byte[] spilledValue = getAlways(key);
 
-        MappedByteBufferMap byteMap = directory[curMapBufferIndex];
+        FileMap byteMap = directory[curMapBufferIndex];
         int index = curMapBufferIndex;
 
         // TODO: We split buckets until the new element fits onto a
@@ -308,9 +308,9 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements
      * page for easy get() and update() calls on an individual key The class keeps track of the current size of the in
      * memory page and handles flushing and paging in respectively
      */
-    private static class MappedByteBufferMap {
-        private SpillFile spillFile;
-        private int pageIndex;
+    private static class FileMap {
+        private final SpillFile spillFile;
+        private final int pageIndex;
         private final int thresholdBytes;
         private long totalResultSize;
         private boolean pagedIn;
@@ -323,7 +323,7 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements
         // Used to determine is an element was written to this page before or not
         BloomFilter<byte[]> bFilter;
 
-        public MappedByteBufferMap(int id, int thresholdBytes, int pageInserts, SpillFile spillFile) {
+        public FileMap(int id, int thresholdBytes, int pageInserts, SpillFile spillFile) {
             this.spillFile = spillFile;
             // size threshold of a page
             this.thresholdBytes = thresholdBytes;
@@ -363,24 +363,33 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements
         }
 
         // Flush the current page to the memory mapped byte buffer
-        private void flushBuffer() throws BufferOverflowException {
+        private void flushBuffer() {
             if (pagedIn) {
-                MappedByteBuffer buffer;
                 // Only flush if page was changed
                 if (dirtyPage) {
                     Collection<byte[]> values = pageMap.values();
-                    buffer = spillFile.getPage(pageIndex);
-                    buffer.clear();
+                    RandomAccessFile file = spillFile.getPage(pageIndex);
                     // number of elements
-                    buffer.putInt(values.size());
-                    for (byte[] value : values) {
-                        // element length
-                        buffer.putInt(value.length);
-                        // element
-                        buffer.put(value, 0, value.length);
+                    try {
+                        file.writeInt(values.size());
+                        int written = Bytes.SIZEOF_INT;
+                        for (byte[] value : values) {
+                            written += Bytes.SIZEOF_INT + value.length;
+                            // safety check
+                            if (written > SpillFile.DEFAULT_PAGE_SIZE) {
+                                throw new BufferOverflowException();
+                            }
+                            // element length
+                            file.writeInt(value.length);
+                            // element
+                            file.write(value, 0, value.length);
+                        }
+                    } catch (IOException ioe) {
+                        // Error during key access on spilled resource
+                        // TODO rework error handling
+                        throw new RuntimeException(ioe);
                     }
                 }
-                buffer = null;
                 // Reset page stats
                 pageMap.clear();
                 totalResultSize = 0;
@@ -389,24 +398,23 @@ public class SpillMap extends AbstractMap<ImmutableBytesPtr, byte[]> implements
             dirtyPage = false;
         }
 
-        // load memory mapped region into a map for fast element access
-        private void pageIn() throws IndexOutOfBoundsException {
+        // load a page into a map for fast element access
+        private void pageIn() {
             if (!pagedIn) {
-                // Map the memory region
-                MappedByteBuffer buffer = spillFile.getPage(pageIndex);
-                int numElements = buffer.getInt();
+                RandomAccessFile file = spillFile.getPage(pageIndex);
+                try {
+                int numElements = file.readInt();
                 for (int i = 0; i < numElements; i++) {
-                    int kvSize = buffer.getInt();
+                    int kvSize = file.readInt();
                     byte[] data = new byte[kvSize];
-                    buffer.get(data, 0, kvSize);
-                    try {
-                        pageMap.put(SpillManager.getKey(data), data);
-                        totalResultSize += (data.length + Bytes.SIZEOF_INT);
-                    } catch (IOException ioe) {
-                        // Error during key access on spilled resource
-                        // TODO rework error handling
-                        throw new RuntimeException(ioe);
-                    }
+                    file.readFully(data);
+                    pageMap.put(SpillManager.getKey(data), data);
+                    totalResultSize += (data.length + Bytes.SIZEOF_INT);
+                }
+                } catch (IOException ioe) {
+                    // Error during key access on spilled resource
+                    // TODO rework error handling
+                    throw new RuntimeException(ioe);
                 }
                 pagedIn = true;
                 dirtyPage = false;