PHOENIX-5120 Avoid using MappedByteBuffers for server side sorting.
authorLars Hofhansl <larsh@apache.org>
Tue, 5 Feb 2019 04:38:02 +0000 (20:38 -0800)
committerLars Hofhansl <larsh@apache.org>
Tue, 5 Feb 2019 04:38:02 +0000 (20:38 -0800)
phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java [new file with mode: 0644]
phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedQueue.java [moved from phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java with 70% similarity]
phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedSortedQueue.java [moved from phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java with 74% similarity]
phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java
new file mode 100644 (file)
index 0000000..c5eeaff
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import java.util.Map;
+
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+
+import com.google.common.collect.Maps;
+
+public class OrderByWithSpillingIT extends OrderByIT {
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        // do lot's of spooling!
+        props.put(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+}
index b23db35..aacea23 100644 (file)
@@ -20,8 +20,9 @@ package org.apache.phoenix.execute;
 import static org.apache.phoenix.util.NumberUtil.add;
 import static org.apache.phoenix.util.NumberUtil.getMin;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
-import java.nio.MappedByteBuffer;
 import java.sql.ParameterMetaData;
 import java.sql.SQLException;
 import java.util.Collections;
@@ -50,7 +51,7 @@ import org.apache.phoenix.execute.visitor.ByteCountVisitor;
 import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
-import org.apache.phoenix.iterate.MappedByteBufferQueue;
+import org.apache.phoenix.iterate.BufferedQueue;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
@@ -293,7 +294,7 @@ public class SortMergeJoinPlan implements QueryPlan {
         private ValueBitSet lhsBitSet;
         private ValueBitSet rhsBitSet;
         private byte[] emptyProjectedValue;
-        private MappedByteBufferTupleQueue queue;
+        private BufferedTupleQueue queue;
         private Iterator<Tuple> queueIterator;
         
         public BasicJoinIterator(ResultIterator lhsIterator, ResultIterator rhsIterator) {
@@ -315,7 +316,7 @@ public class SortMergeJoinPlan implements QueryPlan {
             int len = lhsBitSet.getEstimatedLength();
             this.emptyProjectedValue = new byte[len];
             lhsBitSet.toBytes(emptyProjectedValue, 0);
-            this.queue = new MappedByteBufferTupleQueue(thresholdBytes);
+            this.queue = new BufferedTupleQueue(thresholdBytes);
             this.queueIterator = null;
         }
         
@@ -609,24 +610,24 @@ public class SortMergeJoinPlan implements QueryPlan {
         }
     }
     
-    private static class MappedByteBufferTupleQueue extends MappedByteBufferQueue<Tuple> {
+    private static class BufferedTupleQueue extends BufferedQueue<Tuple> {
 
-        public MappedByteBufferTupleQueue(int thresholdBytes) {
+        public BufferedTupleQueue(int thresholdBytes) {
             super(thresholdBytes);
         }
 
         @Override
-        protected MappedByteBufferSegmentQueue<Tuple> createSegmentQueue(
+        protected BufferedSegmentQueue<Tuple> createSegmentQueue(
                 int index, int thresholdBytes) {
-            return new MappedByteBufferTupleSegmentQueue(index, thresholdBytes, false);
+            return new BufferedTupleSegmentQueue(index, thresholdBytes, false);
         }
 
         @Override
-        protected Comparator<MappedByteBufferSegmentQueue<Tuple>> getSegmentQueueComparator() {
-            return new Comparator<MappedByteBufferSegmentQueue<Tuple>>() {
+        protected Comparator<BufferedSegmentQueue<Tuple>> getSegmentQueueComparator() {
+            return new Comparator<BufferedSegmentQueue<Tuple>>() {
                 @Override
-                public int compare(MappedByteBufferSegmentQueue<Tuple> q1, 
-                        MappedByteBufferSegmentQueue<Tuple> q2) {
+                public int compare(BufferedSegmentQueue<Tuple> q1,
+                        BufferedSegmentQueue<Tuple> q2) {
                     return q1.index() - q2.index();
                 }                
             };
@@ -635,7 +636,7 @@ public class SortMergeJoinPlan implements QueryPlan {
         @Override
         public Iterator<Tuple> iterator() {
             return new Iterator<Tuple>() {
-                private Iterator<MappedByteBufferSegmentQueue<Tuple>> queueIter;
+                private Iterator<BufferedSegmentQueue<Tuple>> queueIter;
                 private Iterator<Tuple> currentIter;
                 {
                     this.queueIter = getSegmentQueues().iterator();
@@ -668,10 +669,10 @@ public class SortMergeJoinPlan implements QueryPlan {
             };
         }
         
-        private static class MappedByteBufferTupleSegmentQueue extends MappedByteBufferSegmentQueue<Tuple> {
+        private static class BufferedTupleSegmentQueue extends BufferedSegmentQueue<Tuple> {
             private LinkedList<Tuple> results;
             
-            public MappedByteBufferTupleSegmentQueue(int index,
+            public BufferedTupleSegmentQueue(int index,
                     int thresholdBytes, boolean hasMaxQueueSize) {
                 super(index, thresholdBytes, hasMaxQueueSize);
                 this.results = Lists.newLinkedList();
@@ -688,23 +689,22 @@ public class SortMergeJoinPlan implements QueryPlan {
                 return Bytes.SIZEOF_INT * 2 + kv.getLength();
             }
 
-            @SuppressWarnings("deprecation")
             @Override
-            protected void writeToBuffer(MappedByteBuffer buffer, Tuple e) {
+            protected void writeToStream(DataOutputStream out, Tuple e) throws IOException {
                 KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(e.getValue(0));
-                buffer.putInt(kv.getLength() + Bytes.SIZEOF_INT);
-                buffer.putInt(kv.getLength());
-                buffer.put(kv.getBuffer(), kv.getOffset(), kv.getLength());
+                out.writeInt(kv.getLength() + Bytes.SIZEOF_INT);
+                out.writeInt(kv.getLength());
+                out.write(kv.getBuffer(), kv.getOffset(), kv.getLength());
             }
 
             @Override
-            protected Tuple readFromBuffer(MappedByteBuffer buffer) {
-                int length = buffer.getInt();
+            protected Tuple readFromStream(DataInputStream in) throws IOException {
+                int length = in.readInt();
                 if (length < 0)
                     return null;
                 
                 byte[] b = new byte[length];
-                buffer.get(b);
+                in.read(b);
                 Result result = ResultUtil.toResult(new ImmutableBytesWritable(b));
                 return new ResultTuple(result);
             }
  */
 package org.apache.phoenix.iterate;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileChannel.MapMode;
 import java.util.AbstractQueue;
 import java.util.Comparator;
 import java.util.Iterator;
@@ -34,26 +36,26 @@ import java.util.UUID;
 import com.google.common.collect.Lists;
 import com.google.common.collect.MinMaxPriorityQueue;
 
-public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
+public abstract class BufferedQueue<T> extends AbstractQueue<T> {
     private final int thresholdBytes;
-    private List<MappedByteBufferSegmentQueue<T>> queues;
+    private List<BufferedSegmentQueue<T>> queues;
     private int currentIndex;
-    private MappedByteBufferSegmentQueue<T> currentQueue;
-    private MinMaxPriorityQueue<MappedByteBufferSegmentQueue<T>> mergedQueue;
+    private BufferedSegmentQueue<T> currentQueue;
+    private MinMaxPriorityQueue<BufferedSegmentQueue<T>> mergedQueue;
 
-    public MappedByteBufferQueue(int thresholdBytes) {
+    public BufferedQueue(int thresholdBytes) {
         this.thresholdBytes = thresholdBytes;
-        this.queues = Lists.<MappedByteBufferSegmentQueue<T>> newArrayList();
+        this.queues = Lists.<BufferedSegmentQueue<T>> newArrayList();
         this.currentIndex = -1;
         this.currentQueue = null;
         this.mergedQueue = null;
     }
     
-    abstract protected MappedByteBufferSegmentQueue<T> createSegmentQueue(int index, int thresholdBytes);
+    abstract protected BufferedSegmentQueue<T> createSegmentQueue(int index, int thresholdBytes);
     
-    abstract protected Comparator<MappedByteBufferSegmentQueue<T>> getSegmentQueueComparator();
+    abstract protected Comparator<BufferedSegmentQueue<T>> getSegmentQueueComparator();
     
-    protected final List<MappedByteBufferSegmentQueue<T>> getSegmentQueues() {
+    protected final List<BufferedSegmentQueue<T>> getSegmentQueues() {
         return queues.subList(0, currentIndex + 1);
     }
 
@@ -77,7 +79,7 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
     public T poll() {
         initMergedQueue();
         if (mergedQueue != null && !mergedQueue.isEmpty()) {
-            MappedByteBufferSegmentQueue<T> queue = mergedQueue.poll();
+            BufferedSegmentQueue<T> queue = mergedQueue.poll();
             T re = queue.poll();
             if (queue.peek() != null) {
                 mergedQueue.add(queue);
@@ -98,7 +100,7 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
     
     @Override
     public void clear() {
-        for (MappedByteBufferSegmentQueue<T> queue : getSegmentQueues()) {
+        for (BufferedSegmentQueue<T> queue : getSegmentQueues()) {
             queue.clear();
         }
         currentIndex = -1;
@@ -114,7 +116,7 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
     @Override
     public int size() {
         int size = 0;
-        for (MappedByteBufferSegmentQueue<T> queue : getSegmentQueues()) {
+        for (BufferedSegmentQueue<T> queue : getSegmentQueues()) {
             size += queue.size();
         }
         return size;
@@ -125,7 +127,7 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
     }
 
     public void close() {
-        for (MappedByteBufferSegmentQueue<T> queue : queues) {
+        for (BufferedSegmentQueue<T> queue : queues) {
             queue.close();
         }
         queues.clear();
@@ -133,9 +135,9 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
     
     private void initMergedQueue() {
         if (mergedQueue == null && currentIndex >= 0) {
-            mergedQueue = MinMaxPriorityQueue.<MappedByteBufferSegmentQueue<T>> orderedBy(
+            mergedQueue = MinMaxPriorityQueue.<BufferedSegmentQueue<T>> orderedBy(
                     getSegmentQueueComparator()).maximumSize(currentIndex + 1).create();
-            for (MappedByteBufferSegmentQueue<T> queue : getSegmentQueues()) {
+            for (BufferedSegmentQueue<T> queue : getSegmentQueues()) {
                 T re = queue.peek();
                 if (re != null) {
                     mergedQueue.add(queue);
@@ -144,17 +146,14 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
         }        
     }
 
-    public abstract static class MappedByteBufferSegmentQueue<T> extends AbstractQueue<T> {
+    public abstract static class BufferedSegmentQueue<T> extends AbstractQueue<T> {
         protected static final int EOF = -1;
-        // at least create 128 KB MappedByteBuffers
-        private static final long DEFAULT_MAPPING_SIZE = 128 * 1024;
         
         private final int index;
         private final int thresholdBytes;
         private final boolean hasMaxQueueSize;
         private long totalResultSize = 0;
         private int maxResultSize = 0;
-        private long mappingSize = 0;
         private File file;
         private boolean isClosed = false;
         private boolean flushBuffer = false;
@@ -164,7 +163,7 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
         // iterators to close on close()
         private List<SegmentQueueFileIterator> iterators;
 
-        public MappedByteBufferSegmentQueue(int index, int thresholdBytes, boolean hasMaxQueueSize) {
+        public BufferedSegmentQueue(int index, int thresholdBytes, boolean hasMaxQueueSize) {
             this.index = index;
             this.thresholdBytes = thresholdBytes;
             this.hasMaxQueueSize = hasMaxQueueSize;
@@ -173,8 +172,8 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
         
         abstract protected Queue<T> getInMemoryQueue();
         abstract protected int sizeOf(T e);
-        abstract protected void writeToBuffer(MappedByteBuffer buffer, T e);
-        abstract protected T readFromBuffer(MappedByteBuffer buffer);
+        abstract protected void writeToStream(DataOutputStream out, T e) throws IOException;
+        abstract protected T readFromStream(DataInputStream in) throws IOException;
         
         public int index() {
             return this.index;
@@ -253,7 +252,6 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
             getInMemoryQueue().clear();
             this.totalResultSize = 0;
             this.maxResultSize = 0;
-            this.mappingSize = 0;
             this.flushBuffer = false;
             this.flushedCount = 0;
             this.current = null;
@@ -303,38 +301,25 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
             totalResultSize = hasMaxQueueSize ? maxResultSize * inMemQueue.size() : (totalResultSize + resultSize);
             if (totalResultSize >= thresholdBytes) {
                 this.file = File.createTempFile(UUID.randomUUID().toString(), null);
-                RandomAccessFile af = new RandomAccessFile(file, "rw");
-                FileChannel fc = af.getChannel();
-                int writeIndex = 0;
-                mappingSize = Math.min(Math.max(maxResultSize, DEFAULT_MAPPING_SIZE), totalResultSize);
-                MappedByteBuffer writeBuffer = fc.map(MapMode.READ_WRITE, writeIndex, mappingSize);
-
-                int resSize = inMemQueue.size();
-                for (int i = 0; i < resSize; i++) {                
-                    T e = inMemQueue.poll();
-                    writeToBuffer(writeBuffer, e);
-                    // buffer close to exhausted, re-map.
-                    if (mappingSize - writeBuffer.position() < maxResultSize) {
-                        writeIndex += writeBuffer.position();
-                        writeBuffer = fc.map(MapMode.READ_WRITE, writeIndex, mappingSize);
+                try (DataOutputStream out = new DataOutputStream(
+                        new BufferedOutputStream(new FileOutputStream(file)))) {
+                    int resSize = inMemQueue.size();
+                    for (int i = 0; i < resSize; i++) {
+                        T e = inMemQueue.poll();
+                        writeToStream(out, e);
                     }
+                    out.writeInt(EOF); // end
+                    flushedCount = resSize;
+                    inMemQueue.clear();
+                    flushBuffer = true;
                 }
-                writeBuffer.putInt(EOF); // end
-                fc.force(true);
-                fc.close();
-                af.close();
-                flushedCount = resSize;
-                inMemQueue.clear();
-                flushBuffer = true;
             }
         }
         
         private class SegmentQueueFileIterator implements Iterator<T>, Closeable {
             private boolean isEnd;
             private long readIndex;
-            private RandomAccessFile af;
-            private FileChannel fc;
-            private MappedByteBuffer readBuffer;
+            private DataInputStream in;
             private T next;
             
             public SegmentQueueFileIterator() {
@@ -354,9 +339,8 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
                 this.readIndex = readIndex;
                 this.next = null;
                 try {
-                    this.af = new RandomAccessFile(file, "r");
-                    this.fc = af.getChannel();
-                    this.readBuffer = fc.map(MapMode.READ_ONLY, readIndex, mappingSize);
+                    this.in = new DataInputStream(
+                            new BufferedInputStream(new FileInputStream(file)));
                 } catch (IOException e) {
                     throw new RuntimeException(e);
                 }
@@ -384,23 +368,17 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
             private T readNext() {
                 if (isEnd)
                     return null;
-                
-                T e = readFromBuffer(readBuffer);
+
+                T e = null;
+                try {
+                    e = readFromStream(in);
+                } catch (IOException ex) {
+                  throw new RuntimeException(ex);
+                }
                 if (e == null) {
                     close();
                     return null;
                 }
-                
-                // buffer close to exhausted, re-map.
-                if (mappingSize - readBuffer.position() < maxResultSize) {
-                    readIndex += readBuffer.position();
-                    try {
-                        readBuffer = fc.map(MapMode.READ_ONLY, readIndex, mappingSize);
-                    } catch (IOException ex) {
-                        throw new RuntimeException(ex);
-                    }
-                }
-                
                 return e;
             }
 
@@ -412,18 +390,9 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
             @Override
             public void close() {
                 this.isEnd = true;
-                if (this.fc != null) {
-                    try {
-                        this.fc.close();
-                    } catch (IOException ignored) {
-                    }
-                }
-                if (this.af != null) {
-                    try {
-                        this.af.close();
-                    } catch (IOException ignored) {
-                    }
-                    this.af = null;
+                try {
+                    this.in.close();
+                } catch (IOException ignored) {
                 }
             }
         }
@@ -17,8 +17,9 @@
  */
 package org.apache.phoenix.iterate;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
-import java.nio.MappedByteBuffer;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
@@ -36,11 +37,11 @@ import org.apache.phoenix.util.ResultUtil;
 
 import com.google.common.collect.MinMaxPriorityQueue;
 
-public class MappedByteBufferSortedQueue extends MappedByteBufferQueue<ResultEntry> {
+public class BufferedSortedQueue extends BufferedQueue<ResultEntry> {
     private Comparator<ResultEntry> comparator;
     private final int limit;
 
-    public MappedByteBufferSortedQueue(Comparator<ResultEntry> comparator,
+    public BufferedSortedQueue(Comparator<ResultEntry> comparator,
             Integer limit, int thresholdBytes) throws IOException {
         super(thresholdBytes);
         this.comparator = comparator;
@@ -48,25 +49,25 @@ public class MappedByteBufferSortedQueue extends MappedByteBufferQueue<ResultEnt
     }
 
     @Override
-    protected org.apache.phoenix.iterate.MappedByteBufferQueue.MappedByteBufferSegmentQueue<ResultEntry> createSegmentQueue(
+    protected BufferedSegmentQueue<ResultEntry> createSegmentQueue(
             int index, int thresholdBytes) {
-        return new MappedByteBufferResultEntryPriorityQueue(index, thresholdBytes, limit, comparator);
+        return new BufferedResultEntryPriorityQueue(index, thresholdBytes, limit, comparator);
     }
 
     @Override
-    protected Comparator<org.apache.phoenix.iterate.MappedByteBufferQueue.MappedByteBufferSegmentQueue<ResultEntry>> getSegmentQueueComparator() {
-        return new Comparator<MappedByteBufferSegmentQueue<ResultEntry>>() {
+    protected Comparator<BufferedSegmentQueue<ResultEntry>> getSegmentQueueComparator() {
+        return new Comparator<BufferedSegmentQueue<ResultEntry>>() {
             @Override
-            public int compare(MappedByteBufferSegmentQueue<ResultEntry> q1,
-                    MappedByteBufferSegmentQueue<ResultEntry> q2) {
+            public int compare(BufferedSegmentQueue<ResultEntry> q1,
+                    BufferedSegmentQueue<ResultEntry> q2) {
                 return comparator.compare(q1.peek(), q2.peek());
             }};
     }
 
-    private static class MappedByteBufferResultEntryPriorityQueue extends MappedByteBufferSegmentQueue<ResultEntry> {          
+    private static class BufferedResultEntryPriorityQueue extends BufferedSegmentQueue<ResultEntry> {
         private MinMaxPriorityQueue<ResultEntry> results = null;
         
-       public MappedByteBufferResultEntryPriorityQueue(int index,
+        public BufferedResultEntryPriorityQueue(int index,
                 int thresholdBytes, int limit, Comparator<ResultEntry> comparator) {
             super(index, thresholdBytes, limit >= 0);
             this.results = limit < 0 ? 
@@ -85,54 +86,54 @@ public class MappedByteBufferSortedQueue extends MappedByteBufferQueue<ResultEnt
         }
 
         @Override
-        protected void writeToBuffer(MappedByteBuffer buffer, ResultEntry e) {
+        protected void writeToStream(DataOutputStream os, ResultEntry e) throws IOException {
             int totalLen = 0;
             List<KeyValue> keyValues = toKeyValues(e);
             for (KeyValue kv : keyValues) {
                 totalLen += (kv.getLength() + Bytes.SIZEOF_INT);
             }
-            buffer.putInt(totalLen);
+            os.writeInt(totalLen);
             for (KeyValue kv : keyValues) {
-                buffer.putInt(kv.getLength());
-                buffer.put(kv.getBuffer(), kv.getOffset(), kv
+                os.writeInt(kv.getLength());
+                os.write(kv.getBuffer(), kv.getOffset(), kv
                         .getLength());
             }
             ImmutableBytesWritable[] sortKeys = e.sortKeys;
-            buffer.putInt(sortKeys.length);
+            os.writeInt(sortKeys.length);
             for (ImmutableBytesWritable sortKey : sortKeys) {
                 if (sortKey != null) {
-                    buffer.putInt(sortKey.getLength());
-                    buffer.put(sortKey.get(), sortKey.getOffset(),
+                    os.writeInt(sortKey.getLength());
+                    os.write(sortKey.get(), sortKey.getOffset(),
                             sortKey.getLength());
                 } else {
-                    buffer.putInt(0);
+                    os.writeInt(0);
                 }
             }
         }
 
         @Override
-        protected ResultEntry readFromBuffer(MappedByteBuffer buffer) {            
-            int length = buffer.getInt();
+        protected ResultEntry readFromStream(DataInputStream is) throws IOException {
+            int length = is.readInt();
             if (length < 0)
                 return null;
-            
+
             byte[] rb = new byte[length];
-            buffer.get(rb);
+            is.read(rb);
             Result result = ResultUtil.toResult(new ImmutableBytesWritable(rb));
             ResultTuple rt = new ResultTuple(result);
-            int sortKeySize = buffer.getInt();
+            int sortKeySize = is.readInt();
             ImmutableBytesWritable[] sortKeys = new ImmutableBytesWritable[sortKeySize];
             for (int i = 0; i < sortKeySize; i++) {
-                int contentLength = buffer.getInt();
+                int contentLength = is.readInt();
                 if (contentLength > 0) {
                     byte[] sortKeyContent = new byte[contentLength];
-                    buffer.get(sortKeyContent);
+                    is.read(sortKeyContent);
                     sortKeys[i] = new ImmutableBytesWritable(sortKeyContent);
                 } else {
                     sortKeys[i] = null;
                 }
             }
-            
+
             return new ResultEntry(sortKeys, rt);
         }
 
index 36b274a..22712ff 100644 (file)
@@ -208,7 +208,7 @@ public class OrderedResultIterator implements PeekingResultIterator {
         List<Expression> expressions = Lists.newArrayList(Collections2.transform(orderByExpressions, TO_EXPRESSION));
         final Comparator<ResultEntry> comparator = buildComparator(orderByExpressions);
         try{
-            final MappedByteBufferSortedQueue queueEntries = new MappedByteBufferSortedQueue(comparator, limit,
+            final BufferedSortedQueue queueEntries = new BufferedSortedQueue(comparator, limit,
                     thresholdBytes);
             resultIterator = new PeekingResultIterator() {
                 int count = 0;