IGNITE-10812: SQL: split classes responsible for distributed joins. This closes ...
authordevozerov <vozerov@gridgain.com>
Wed, 26 Dec 2018 15:23:56 +0000 (18:23 +0300)
committerdevozerov <vozerov@gridgain.com>
Wed, 26 Dec 2018 15:23:56 +0000 (18:23 +0300)
23 files changed:
modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeClientIndex.java
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContextKey.java [new file with mode: 0644]
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/BroadcastCursor.java [new file with mode: 0644]
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CollocationModel.java [moved from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java with 92% similarity]
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CursorIteratorWrapper.java [new file with mode: 0644]
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedJoinMode.java [moved from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java with 96% similarity]
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java [new file with mode: 0644]
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeSource.java [new file with mode: 0644]
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeStream.java [new file with mode: 0644]
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/SegmentKey.java [new file with mode: 0644]
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/SourceKey.java [new file with mode: 0644]
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/UnicastCursor.java [new file with mode: 0644]
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java

index 4d1577b..35e9424 100644 (file)
@@ -28,6 +28,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.query.h2.H2Cursor;
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.util.GridCursorIteratorWrapper;
 import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
@@ -154,7 +155,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
     }
 
     /** {@inheritDoc} */
-    @Override protected int segmentsCount() {
+    @Override public int segmentsCount() {
         return segments.length;
     }
 
@@ -335,7 +336,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
     @SuppressWarnings("unchecked")
     private GridCursor<GridH2Row> rowIterator(Iterator<SpatialKey> i, TableFilter filter) {
         if (!i.hasNext())
-            return EMPTY_CURSOR;
+            return H2Utils.EMPTY_CURSOR;
 
         long time = System.currentTimeMillis();
 
index c542758..2a5f33c 100644 (file)
@@ -45,15 +45,22 @@ import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.processors.query.GridQueryProperty;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessage;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory;
 import org.apache.ignite.internal.util.GridStringBuilder;
+import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.h2.engine.Session;
 import org.h2.jdbc.JdbcConnection;
+import org.h2.result.Row;
 import org.h2.result.SortOrder;
 import org.h2.table.IndexColumn;
 import org.h2.util.LocalDateTimeUtils;
@@ -78,6 +85,8 @@ import org.h2.value.ValueTime;
 import org.h2.value.ValueTimestamp;
 import org.h2.value.ValueUuid;
 
+import javax.cache.CacheException;
+
 import static org.apache.ignite.internal.processors.query.QueryUtils.KEY_FIELD_NAME;
 import static org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_NAME;
 import static org.apache.ignite.internal.processors.query.QueryUtils.VER_FIELD_NAME;
@@ -94,6 +103,19 @@ public class H2Utils {
     /** Quotation character. */
     private static final char ESC_CH = '\"';
 
+    /** Empty cursor. */
+    public static final GridCursor<GridH2Row> EMPTY_CURSOR = new GridCursor<GridH2Row>() {
+        /** {@inheritDoc} */
+        @Override public boolean next() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridH2Row get() {
+            return null;
+        }
+    };
+
     /**
      * @param c1 First column.
      * @param c2 Second column.
@@ -598,4 +620,44 @@ public class H2Utils {
 
         return qry;
     }
+
+    /**
+     * @param row Row.
+     * @return Row message.
+     */
+    public static GridH2RowMessage toRowMessage(Row row) {
+        if (row == null)
+            return null;
+
+        int cols = row.getColumnCount();
+
+        assert cols > 0 : cols;
+
+        List<GridH2ValueMessage> vals = new ArrayList<>(cols);
+
+        for (int i = 0; i < cols; i++) {
+            try {
+                vals.add(GridH2ValueMessageFactory.toMessage(row.getValue(i)));
+            }
+            catch (IgniteCheckedException e) {
+                throw new CacheException(e);
+            }
+        }
+
+        GridH2RowMessage res = new GridH2RowMessage();
+
+        res.values(vals);
+
+        return res;
+    }
+
+    /**
+     * Create retry exception for distributed join.
+     *
+     * @param msg Message.
+     * @return Exception.
+     */
+    public static GridH2RetryException retryException(String msg) {
+        return new GridH2RetryException(msg);
+    }
 }
index d1b435d..5f692e9 100644 (file)
@@ -177,8 +177,8 @@ import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.txStart
 import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT;
 import static org.apache.ignite.internal.processors.query.h2.PreparedStatementEx.MVCC_CACHE_ID;
 import static org.apache.ignite.internal.processors.query.h2.PreparedStatementEx.MVCC_STATE;
-import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
-import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.distributedJoinMode;
+import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
+import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.distributedJoinMode;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.LOCAL;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE;
 
index ef6d5ff..9a42362 100644 (file)
@@ -82,7 +82,7 @@ public class H2PkHashIndex extends GridH2IndexBase {
     }
 
     /** {@inheritDoc} */
-    @Override protected int segmentsCount() {
+    @Override public int segmentsCount() {
         return 1;
     }
 
index a0bab43..df896ed 100644 (file)
@@ -68,7 +68,7 @@ public class H2TreeClientIndex extends H2TreeIndexBase {
     }
 
     /** {@inheritDoc} */
-    @Override protected int segmentsCount() {
+    @Override public int segmentsCount() {
         throw SHOULDNT_BE_INVOKED_EXCEPTION;
     }
 
index 40d0a9f..6b09b76 100644 (file)
@@ -40,8 +40,6 @@ import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.stat.IoStatisticsHolder;
 import org.apache.ignite.internal.stat.IoStatisticsType;
-import org.apache.ignite.internal.util.IgniteTree;
-import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
@@ -273,7 +271,7 @@ public class H2TreeIndex extends H2TreeIndexBase {
     }
 
     /** {@inheritDoc} */
-    @Override protected int segmentsCount() {
+    @Override public int segmentsCount() {
         return segments.length;
     }
 
@@ -449,25 +447,6 @@ public class H2TreeIndex extends H2TreeIndexBase {
     }
 
     /** {@inheritDoc} */
-    @Override protected H2Cursor doFind0(
-        IgniteTree t,
-        @Nullable SearchRow first,
-        @Nullable SearchRow last,
-        BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter) {
-        try {
-            GridCursor<GridH2Row> range = ((BPlusTree)t).find(first, last, filter, null);
-
-            if (range == null)
-                range = EMPTY_CURSOR;
-
-            return new H2Cursor(range);
-        }
-        catch (IgniteCheckedException e) {
-            throw DbException.convert(e);
-        }
-    }
-
-    /** {@inheritDoc} */
     @Override protected BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter(GridH2QueryContext qctx) {
         if (qctx == null) {
             assert !cctx.mvccEnabled();
index 6c45c29..9547b5f 100644 (file)
 package org.apache.ignite.internal.processors.query.h2.opt;
 
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTopic;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
 import org.apache.ignite.internal.processors.query.h2.H2Cursor;
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
+import org.apache.ignite.internal.processors.query.h2.opt.join.CursorIteratorWrapper;
+import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedLookupBatch;
+import org.apache.ignite.internal.processors.query.h2.opt.join.CollocationModel;
+import org.apache.ignite.internal.processors.query.h2.opt.join.RangeSource;
+import org.apache.ignite.internal.processors.query.h2.opt.join.RangeStream;
+import org.apache.ignite.internal.processors.query.h2.opt.join.SegmentKey;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
@@ -41,15 +46,12 @@ import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteTree;
 import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.typedef.CIX2;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.logger.NullLogger;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.h2.engine.Session;
 import org.h2.index.BaseIndex;
-import org.h2.index.Cursor;
 import org.h2.index.IndexCondition;
 import org.h2.index.IndexLookupBatch;
 import org.h2.index.ViewIndex;
@@ -58,40 +60,26 @@ import org.h2.result.Row;
 import org.h2.result.SearchRow;
 import org.h2.table.IndexColumn;
 import org.h2.table.TableFilter;
-import org.h2.util.DoneFuture;
 import org.h2.value.Value;
-import org.h2.value.ValueNull;
-import org.jetbrains.annotations.Nullable;
 
 import javax.cache.CacheException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
-import static java.util.Collections.emptyIterator;
 import static java.util.Collections.singletonList;
-import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.LOCAL_ONLY;
-import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2CollocationModel.buildCollocationModel;
+import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
+import static org.apache.ignite.internal.processors.query.h2.opt.join.CollocationModel.buildCollocationModel;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.KEY_COL;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE;
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor.COL_NOT_EXISTS;
 import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_ERROR;
 import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_NOT_FOUND;
 import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_OK;
-import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds.rangeBounds;
 import static org.h2.result.Row.MEMORY_CALCULATE;
 
 /**
@@ -99,7 +87,7 @@ import static org.h2.result.Row.MEMORY_CALCULATE;
  */
 public abstract class GridH2IndexBase extends BaseIndex {
     /** */
-    private static final Object EXPLICIT_NULL = new Object();
+    public static final Object EXPLICIT_NULL = new Object();
 
     /** */
     private Object msgTopic;
@@ -112,7 +100,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
 
     /** */
     private final CIX2<ClusterNode,Message> locNodeHnd = new CIX2<ClusterNode,Message>() {
-        @Override public void applyx(ClusterNode clusterNode, Message msg) throws IgniteCheckedException {
+        @Override public void applyx(ClusterNode clusterNode, Message msg) {
             onMessage0(clusterNode.id(), msg);
         }
     };
@@ -122,6 +110,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
     /**
      * @param tbl Table.
      */
+    @SuppressWarnings("MapReplaceableByEnumMap")
     protected final void initDistributedJoinMessaging(GridH2Table tbl) {
         final GridH2RowDescriptor desc = tbl.rowDescriptor();
 
@@ -248,7 +237,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
         // Query expressions can not be distributed as well.
         if (qctx == null || qctx.type() != PREPARE || qctx.distributedJoinMode() == OFF ||
             !ses.isJoinBatchEnabled() || ses.isPreparingQueryExpression())
-            return GridH2CollocationModel.MULTIPLIER_COLLOCATED;
+            return CollocationModel.MULTIPLIER_COLLOCATED;
 
         // We have to clear this cache because normally sub-query plan cost does not depend on anything
         // other than index condition masks and sort order, but in our case it can depend on order
@@ -257,7 +246,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
 
         assert filters != null;
 
-        GridH2CollocationModel c = buildCollocationModel(qctx, ses.getSubQueryInfo(), filters, filter, false);
+        CollocationModel c = buildCollocationModel(qctx, ses.getSubQueryInfo(), filters, filter, false);
 
         return c.calculateMultiplier();
     }
@@ -327,7 +316,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
 
         GridCacheContext<?, ?> cctx = getTable().rowDescriptor().context();
 
-        return new DistributedLookupBatch(cctx, ucast, affColId);
+        return new DistributedLookupBatch(this, cctx, ucast, affColId);
     }
 
     /** {@inheritDoc} */
@@ -344,7 +333,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
      * @param nodes Nodes.
      * @param msg Message.
      */
-    private void send(Collection<ClusterNode> nodes, Message msg) {
+    public void send(Collection<ClusterNode> nodes, Message msg) {
         if (!getTable().rowDescriptor().indexing().send(msgTopic,
             -1,
             nodes,
@@ -353,7 +342,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
             locNodeHnd,
             GridIoPolicy.IDX_POOL,
             false))
-            throw retryException("Failed to send message to nodes: " + nodes);
+            throw H2Utils.retryException("Failed to send message to nodes: " + nodes);
     }
 
     /**
@@ -413,7 +402,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
                     // This is the first request containing all the search rows.
                     assert !msg.bounds().isEmpty() : "empty bounds";
 
-                    src = new RangeSource(msg.bounds(), msg.segment(), filter(qctx));
+                    src = new RangeSource(this, msg.bounds(), msg.segment(), filter(qctx));
                 }
                 else {
                     // This is request to fetch next portion of data.
@@ -498,197 +487,6 @@ public abstract class GridH2IndexBase extends BaseIndex {
     }
 
     /**
-     * @param v1 First value.
-     * @param v2 Second value.
-     * @return {@code true} If they equal.
-     */
-    private boolean equal(Value v1, Value v2) {
-        return v1 == v2 || (v1 != null && v2 != null && v1.compareTypeSafe(v2, getDatabase().getCompareMode()) == 0);
-    }
-
-    /**
-     * @param qctx Query context.
-     * @param batchLookupId Batch lookup ID.
-     * @param segmentId Segment ID.
-     * @return Index range request.
-     */
-    private static GridH2IndexRangeRequest createRequest(GridH2QueryContext qctx, int batchLookupId, int segmentId) {
-        GridH2IndexRangeRequest req = new GridH2IndexRangeRequest();
-
-        req.originNodeId(qctx.originNodeId());
-        req.queryId(qctx.queryId());
-        req.originSegmentId(qctx.segment());
-        req.segment(segmentId);
-        req.batchLookupId(batchLookupId);
-
-        return req;
-    }
-
-
-    /**
-     * @param qctx Query context.
-     * @param cctx Cache context.
-     * @param isLocalQry Local query flag.
-     * @return Collection of nodes for broadcasting.
-     */
-    private List<SegmentKey> broadcastSegments(GridH2QueryContext qctx, GridCacheContext<?, ?> cctx, boolean isLocalQry) {
-        Map<UUID, int[]> partMap = qctx.partitionsMap();
-
-        List<ClusterNode> nodes;
-
-        if (isLocalQry) {
-            if (partMap != null && !partMap.containsKey(cctx.localNodeId()))
-                return Collections.emptyList(); // Prevent remote index call for local queries.
-
-            nodes = Collections.singletonList(cctx.localNode());
-        }
-        else {
-            if (partMap == null)
-                nodes = new ArrayList<>(CU.affinityNodes(cctx, qctx.topologyVersion()));
-            else {
-                nodes = new ArrayList<>(partMap.size());
-
-                GridKernalContext ctx = kernalContext();
-
-                for (UUID nodeId : partMap.keySet()) {
-                    ClusterNode node = ctx.discovery().node(nodeId);
-
-                    if (node == null)
-                        throw retryException("Failed to get node by ID during broadcast [nodeId=" + nodeId + ']');
-
-                    nodes.add(node);
-                }
-            }
-
-            if (F.isEmpty(nodes))
-                throw retryException("Failed to collect affinity nodes during broadcast [" +
-                    "cacheName=" + cctx.name() + ']');
-        }
-
-        int segmentsCount = segmentsCount();
-
-        List<SegmentKey> res = new ArrayList<>(nodes.size() * segmentsCount);
-
-        for (ClusterNode node : nodes) {
-            for (int seg = 0; seg < segmentsCount; seg++)
-                res.add(new SegmentKey(node, seg));
-        }
-
-        return res;
-    }
-
-    /**
-     * @param cctx Cache context.
-     * @param qctx Query context.
-     * @param affKeyObj Affinity key.
-     * @param isLocalQry Local query flag.
-     * @return Segment key for Affinity key.
-     */
-    private SegmentKey rangeSegment(GridCacheContext<?, ?> cctx, GridH2QueryContext qctx, Object affKeyObj, boolean isLocalQry) {
-        assert affKeyObj != null && affKeyObj != EXPLICIT_NULL : affKeyObj;
-
-        ClusterNode node;
-
-        int partition = cctx.affinity().partition(affKeyObj);
-
-        if (isLocalQry) {
-            if (qctx.partitionsMap() != null) {
-                // If we have explicit partitions map, we have to use it to calculate affinity node.
-                UUID nodeId = qctx.nodeForPartition(partition, cctx);
-
-                if(!cctx.localNodeId().equals(nodeId))
-                    return null; // Prevent remote index call for local queries.
-            }
-
-            if (!cctx.affinity().primaryByKey(cctx.localNode(), partition, qctx.topologyVersion()))
-                return null;
-
-            node = cctx.localNode();
-        }
-        else{
-            if (qctx.partitionsMap() != null) {
-                // If we have explicit partitions map, we have to use it to calculate affinity node.
-                UUID nodeId = qctx.nodeForPartition(partition, cctx);
-
-            node = cctx.discovery().node(nodeId);
-        }
-        else // Get primary node for current topology version.
-            node = cctx.affinity().primaryByKey(affKeyObj, qctx.topologyVersion());
-
-            if (node == null) // Node was not found, probably topology changed and we need to retry the whole query.
-                throw retryException("Failed to get primary node by key for range segment.");
-        }
-
-        return new SegmentKey(node, segmentForPartition(partition));
-    }
-
-    /** */
-    protected class SegmentKey {
-        /** */
-        final ClusterNode node;
-
-        /** */
-        final int segmentId;
-
-        SegmentKey(ClusterNode node, int segmentId) {
-            assert node != null;
-
-            this.node = node;
-            this.segmentId = segmentId;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            SegmentKey key = (SegmentKey)o;
-
-            return segmentId == key.segmentId && node.id().equals(key.node.id());
-
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int result = node.hashCode();
-            result = 31 * result + segmentId;
-            return result;
-        }
-    }
-
-    /**
-     * @param row Row.
-     * @return Row message.
-     */
-    private GridH2RowMessage toRowMessage(Row row) {
-        if (row == null)
-            return null;
-
-        int cols = row.getColumnCount();
-
-        assert cols > 0 : cols;
-
-        List<GridH2ValueMessage> vals = new ArrayList<>(cols);
-
-        for (int i = 0; i < cols; i++) {
-            try {
-                vals.add(GridH2ValueMessageFactory.toMessage(row.getValue(i)));
-            }
-            catch (IgniteCheckedException e) {
-                throw new CacheException(e);
-            }
-        }
-
-        GridH2RowMessage res = new GridH2RowMessage();
-
-        res.values(vals);
-
-        return res;
-    }
-
-    /**
      * @param msg Row message.
      * @return Search row.
      */
@@ -723,7 +521,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
      * @param row Search row.
      * @return Row message.
      */
-    private GridH2RowMessage toSearchRowMessage(SearchRow row) {
+    public GridH2RowMessage toSearchRowMessage(SearchRow row) {
         if (row == null)
             return null;
 
@@ -766,41 +564,15 @@ public abstract class GridH2IndexBase extends BaseIndex {
     }
 
     /**
-     * @param msg Message.
-     * @return Row.
+     * @return Index segments count.
      */
-    private Row toRow(GridH2RowMessage msg) {
-        if (msg == null)
-            return null;
-
-        GridKernalContext ctx = kernalContext();
-
-        List<GridH2ValueMessage> vals = msg.values();
-
-        assert !F.isEmpty(vals) : vals;
-
-        Value[] vals0 = new Value[vals.size()];
-
-        for (int i = 0; i < vals0.length; i++) {
-            try {
-                vals0[i] = vals.get(i).value(ctx);
-            }
-            catch (IgniteCheckedException e) {
-                throw new CacheException(e);
-            }
-        }
-
-        return database.createRow(vals0, MEMORY_CALCULATE);
-    }
-
-    /** @return Index segments count. */
-    protected abstract int segmentsCount();
+    public abstract int segmentsCount();
 
     /**
      * @param partition Partition idx.
      * @return Segment ID for given key
      */
-    protected int segmentForPartition(int partition){
+    public int segmentForPartition(int partition){
         return segmentsCount() == 1 ? 0 : (partition % segmentsCount());
     }
 
@@ -808,6 +580,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
      * @param row Table row.
      * @return Segment ID for given row.
      */
+    @SuppressWarnings("IfMayBeConditional")
     protected int segmentForRow(SearchRow row) {
         assert row != null;
 
@@ -831,722 +604,33 @@ public abstract class GridH2IndexBase extends BaseIndex {
     }
 
     /**
-     * Simple cursor from a single node.
-     */
-    private static class UnicastCursor implements Cursor {
-        /** */
-        final int rangeId;
-
-        /** */
-        RangeStream stream;
-
-        /**
-         * @param rangeId Range ID.
-         * @param keys Remote index segment keys.
-         * @param rangeStreams Range streams.
-         */
-        UnicastCursor(int rangeId, List<SegmentKey> keys, Map<SegmentKey, RangeStream> rangeStreams) {
-            assert keys.size() == 1;
-
-            this.rangeId = rangeId;
-            this.stream = rangeStreams.get(F.first(keys));
-
-            assert stream != null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean next() {
-            return stream.next(rangeId);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Row get() {
-            return stream.get(rangeId);
-        }
-
-        /** {@inheritDoc} */
-        @Override public SearchRow getSearchRow() {
-            return get();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean previous() {
-            throw new UnsupportedOperationException();
-        }
-    }
-
-    /**
-     * Merge cursor from multiple nodes.
-     */
-    private class BroadcastCursor implements Cursor, Comparator<RangeStream> {
-        /** */
-        final int rangeId;
-
-        /** */
-        final RangeStream[] streams;
-
-        /** */
-        boolean first = true;
-
-        /** */
-        int off;
-
-        /**
-         * @param rangeId Range ID.
-         * @param segmentKeys Remote nodes.
-         * @param rangeStreams Range streams.
-         */
-        BroadcastCursor(int rangeId, Collection<SegmentKey> segmentKeys, Map<SegmentKey, RangeStream> rangeStreams) {
-
-            this.rangeId = rangeId;
-
-            streams = new RangeStream[segmentKeys.size()];
-
-            int i = 0;
-
-            for (SegmentKey segmentKey : segmentKeys) {
-                RangeStream stream = rangeStreams.get(segmentKey);
-
-                assert stream != null;
-
-                streams[i++] = stream;
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public int compare(RangeStream o1, RangeStream o2) {
-            if (o1 == o2)
-                return 0;
-
-            // Nulls are at the beginning of array.
-            if (o1 == null)
-                return -1;
-
-            if (o2 == null)
-                return 1;
-
-            return compareRows(o1.get(rangeId), o2.get(rangeId));
-        }
-
-        /**
-         * Try to fetch the first row.
-         *
-         * @return {@code true} If we were able to find at least one row.
-         */
-        private boolean goFirst() {
-            // Fetch first row from all the streams and sort them.
-            for (int i = 0; i < streams.length; i++) {
-                if (!streams[i].next(rangeId)) {
-                    streams[i] = null;
-                    off++; // After sorting this offset will cut off all null elements at the beginning of array.
-                }
-            }
-
-            if (off == streams.length)
-                return false;
-
-            Arrays.sort(streams, this);
-
-            return true;
-        }
-
-        /**
-         * Fetch next row.
-         *
-         * @return {@code true} If we were able to find at least one row.
-         */
-        private boolean goNext() {
-            assert off != streams.length;
-
-            if (!streams[off].next(rangeId)) {
-                // Next row from current min stream was not found -> nullify that stream and bump offset forward.
-                streams[off] = null;
-
-                return ++off != streams.length;
-            }
-
-            // Bubble up current min stream with respect to fetched row to achieve correct sort order of streams.
-            bubbleUp(streams, off, this);
-
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean next() {
-            if (first) {
-                first = false;
-
-                return goFirst();
-            }
-
-            return goNext();
-        }
-
-        /** {@inheritDoc} */
-        @Override public Row get() {
-            return streams[off].get(rangeId);
-        }
-
-        /** {@inheritDoc} */
-        @Override public SearchRow getSearchRow() {
-            return get();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean previous() {
-            throw new UnsupportedOperationException();
-        }
-    }
-
-    /**
-     * Index lookup batch.
-     */
-    private class DistributedLookupBatch implements IndexLookupBatch {
-        /** */
-        final GridCacheContext<?,?> cctx;
-
-        /** */
-        final boolean ucast;
-
-        /** */
-        final int affColId;
-
-        /** */
-        GridH2QueryContext qctx;
-
-        /** */
-        int batchLookupId;
-
-        /** */
-        Map<SegmentKey, RangeStream> rangeStreams = Collections.emptyMap();
-
-        /** */
-        List<SegmentKey> broadcastSegments;
-
-        /** */
-        List<Future<Cursor>> res = Collections.emptyList();
-
-        /** */
-        boolean batchFull;
-
-        /** */
-        boolean findCalled;
-
-        /**
-         * @param cctx Cache Cache context.
-         * @param ucast Unicast or broadcast query.
-         * @param affColId Affinity column ID.
-         */
-        DistributedLookupBatch(GridCacheContext<?, ?> cctx, boolean ucast, int affColId) {
-            this.cctx = cctx;
-            this.ucast = ucast;
-            this.affColId = affColId;
-        }
-
-        /**
-         * @param firstRow First row.
-         * @param lastRow Last row.
-         * @return Affinity key or {@code null}.
-         */
-        private Object getAffinityKey(SearchRow firstRow, SearchRow lastRow) {
-            if (affColId == COL_NOT_EXISTS)
-                return null;
-
-            if (firstRow == null || lastRow == null)
-                return null;
-
-            Value affKeyFirst = firstRow.getValue(affColId);
-            Value affKeyLast = lastRow.getValue(affColId);
-
-            if (affKeyFirst != null && equal(affKeyFirst, affKeyLast))
-                return affKeyFirst == ValueNull.INSTANCE ? EXPLICIT_NULL : affKeyFirst.getObject();
-
-            if (getTable().rowDescriptor().isKeyColumn(affColId))
-                return null;
-
-            // Try to extract affinity key from primary key.
-            Value pkFirst = firstRow.getValue(KEY_COL);
-            Value pkLast = lastRow.getValue(KEY_COL);
-
-            if (pkFirst == ValueNull.INSTANCE || pkLast == ValueNull.INSTANCE)
-                return EXPLICIT_NULL;
-
-            if (pkFirst == null || pkLast == null || !equal(pkFirst, pkLast))
-                return null;
-
-            Object pkAffKeyFirst = cctx.affinity().affinityKey(pkFirst.getObject());
-            Object pkAffKeyLast = cctx.affinity().affinityKey(pkLast.getObject());
-
-            if (pkAffKeyFirst == null || pkAffKeyLast == null)
-                throw new CacheException("Cache key without affinity key.");
-
-            if (pkAffKeyFirst.equals(pkAffKeyLast))
-                return pkAffKeyFirst;
-
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("ForLoopReplaceableByForEach")
-        @Override public boolean addSearchRows(SearchRow firstRow, SearchRow lastRow) {
-            if (qctx == null || findCalled) {
-                if (qctx == null) {
-                    // It is the first call after query begin (may be after reuse),
-                    // reinitialize query context and result.
-                    qctx = GridH2QueryContext.get();
-                    res = new ArrayList<>();
-
-                    assert qctx != null;
-                    assert !findCalled;
-                }
-                else {
-                    // Cleanup after the previous lookup phase.
-                    assert batchLookupId != 0;
-
-                    findCalled = false;
-                    qctx.putStreams(batchLookupId, null);
-                    res.clear();
-                }
-
-                // Reinitialize for the next lookup phase.
-                batchLookupId = qctx.nextBatchLookupId();
-                rangeStreams = new HashMap<>();
-            }
-
-            Object affKey = getAffinityKey(firstRow, lastRow);
-
-            boolean locQry = localQuery();
-
-            List<SegmentKey> segmentKeys;
-
-            if (affKey != null) {
-                // Affinity key is provided.
-                if (affKey == EXPLICIT_NULL) // Affinity key is explicit null, we will not find anything.
-                    return false;
-
-                segmentKeys = F.asList(rangeSegment(cctx, qctx, affKey, locQry));
-            }
-            else {
-                // Affinity key is not provided or is not the same in upper and lower bounds, we have to broadcast.
-                if (broadcastSegments == null)
-                    broadcastSegments = broadcastSegments(qctx, cctx, locQry);
-
-                segmentKeys = broadcastSegments;
-            }
-
-            if (locQry && segmentKeys.isEmpty())
-                return false; // Nothing to do
-
-            assert !F.isEmpty(segmentKeys) : segmentKeys;
-
-            final int rangeId = res.size();
-
-            // Create messages.
-            GridH2RowMessage first = toSearchRowMessage(firstRow);
-            GridH2RowMessage last = toSearchRowMessage(lastRow);
-
-            // Range containing upper and lower bounds.
-            GridH2RowRangeBounds rangeBounds = rangeBounds(rangeId, first, last);
-
-            // Add range to every message of every participating node.
-            for (int i = 0; i < segmentKeys.size(); i++) {
-                SegmentKey segmentKey = segmentKeys.get(i);
-                assert segmentKey != null;
-
-                RangeStream stream = rangeStreams.get(segmentKey);
-
-                List<GridH2RowRangeBounds> bounds;
-
-                if (stream == null) {
-                    stream = new RangeStream(qctx, segmentKey.node);
-
-                    stream.req = createRequest(qctx, batchLookupId, segmentKey.segmentId);
-                    stream.req.bounds(bounds = new ArrayList<>());
-
-                    rangeStreams.put(segmentKey, stream);
-                }
-                else
-                    bounds = stream.req.bounds();
-
-                bounds.add(rangeBounds);
-
-                // If at least one node will have a full batch then we are ok.
-                if (bounds.size() >= qctx.pageSize())
-                    batchFull = true;
-            }
-
-            Future<Cursor> fut = new DoneFuture<>(segmentKeys.size() == 1 ?
-                new UnicastCursor(rangeId, segmentKeys, rangeStreams) :
-                new BroadcastCursor(rangeId, segmentKeys, rangeStreams));
-
-            res.add(fut);
-
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isBatchFull() {
-            return batchFull;
-        }
-
-        /**
-         * @return {@code True} if local query execution is enforced.
-         */
-        private boolean localQuery() {
-            assert qctx != null : "Missing query context: " + this;
-
-            return qctx.distributedJoinMode() == LOCAL_ONLY;
-        }
-
-        /**
-         *
-         */
-        private void startStreams() {
-            if (rangeStreams.isEmpty()) {
-                assert res.isEmpty();
-
-                return;
-            }
-
-            qctx.putStreams(batchLookupId, rangeStreams);
-
-            // Start streaming.
-            for (RangeStream stream : rangeStreams.values())
-                stream.start();
-        }
-
-        /** {@inheritDoc} */
-        @Override public List<Future<Cursor>> find() {
-            batchFull = false;
-            findCalled = true;
-
-            startStreams();
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void reset(boolean beforeQry) {
-            if (beforeQry || qctx == null) // Query context can be null if addSearchRows was never called.
-                return;
-
-            assert batchLookupId != 0;
-
-            // Do cleanup after the query run.
-            qctx.putStreams(batchLookupId, null);
-            qctx = null; // The same query can be reused multiple times for different query contexts.
-            batchLookupId = 0;
-
-            rangeStreams = Collections.emptyMap();
-            broadcastSegments = null;
-            batchFull = false;
-            findCalled = false;
-            res = Collections.emptyList();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String getPlanSQL() {
-            return ucast ? "unicast" : "broadcast";
-        }
-    }
-
-    /**
-     * Per node range stream.
+     * Find rows for the segments (distributed joins).
+     *
+     * @param bounds Bounds.
+     * @param segment Segment.
+     * @param filter Filter.
+     * @return Iterator.
      */
-    private class RangeStream {
-        /** */
-        final GridH2QueryContext qctx;
-
-        /** */
-        final ClusterNode node;
-
-        /** */
-        GridH2IndexRangeRequest req;
-
-        /** */
-        int remainingRanges;
-
-        /** */
-        final BlockingQueue<GridH2IndexRangeResponse> respQueue = new LinkedBlockingQueue<>();
-
-        /** */
-        Iterator<GridH2RowRange> ranges = emptyIterator();
-
-        /** */
-        Cursor cursor = GridH2Cursor.EMPTY;
-
-        /** */
-        int cursorRangeId = -1;
-
-        /**
-         * @param qctx Query context.
-         * @param node Node.
-         */
-        RangeStream(GridH2QueryContext qctx, ClusterNode node) {
-            this.node = node;
-            this.qctx = qctx;
-        }
-
-        /**
-         * Start streaming.
-         */
-        private void start() {
-            assert ctx != null;
-            assert log != null: getName();
-
-            remainingRanges = req.bounds().size();
-
-            assert remainingRanges > 0;
-
-            if (log.isDebugEnabled())
-                log.debug("Starting stream: [node=" + node + ", req=" + req + "]");
-
-            send(singletonList(node), req);
-        }
-
-        /**
-         * @param msg Response.
-         */
-        public void onResponse(GridH2IndexRangeResponse msg) {
-            respQueue.add(msg);
-        }
-
-        /**
-         * @return Response.
-         */
-        private GridH2IndexRangeResponse awaitForResponse() {
-            assert remainingRanges > 0;
-
-            final long start = U.currentTimeMillis();
-
-            for (int attempt = 0;; attempt++) {
-                if (qctx.isCleared())
-                    throw retryException("Query is cancelled.");
-
-                if (kernalContext().isStopping())
-                    throw retryException("Local node is stopping.");
-
-                GridH2IndexRangeResponse res;
-
-                try {
-                    res = respQueue.poll(500, TimeUnit.MILLISECONDS);
-                }
-                catch (InterruptedException ignored) {
-                    throw retryException("Interrupted while waiting for reply.");
-                }
-
-                if (res != null) {
-                    switch (res.status()) {
-                        case STATUS_OK:
-                            List<GridH2RowRange> ranges0 = res.ranges();
-
-                            remainingRanges -= ranges0.size();
-
-                            if (ranges0.get(ranges0.size() - 1).isPartial())
-                                remainingRanges++;
-
-                            if (remainingRanges > 0) {
-                                if (req.bounds() != null)
-                                    req = createRequest(qctx, req.batchLookupId(), req.segment());
-
-                                // Prefetch next page.
-                                send(singletonList(node), req);
-                            }
-                            else
-                                req = null;
-
-                            return res;
-
-                        case STATUS_NOT_FOUND:
-                            if (req == null || req.bounds() == null) // We have already received the first response.
-                                throw retryException("Failure on remote node.");
-
-                            if (U.currentTimeMillis() - start > 30_000)
-                                throw retryException("Timeout reached.");
-
-                            try {
-                                U.sleep(20 * attempt);
-                            }
-                            catch (IgniteInterruptedCheckedException e) {
-                                throw new IgniteInterruptedException(e.getMessage());
-                            }
-
-                            // Retry to send the request once more after some time.
-                            send(singletonList(node), req);
-
-                            break;
-
-                        case STATUS_ERROR:
-                            throw new CacheException(res.error());
-
-                        default:
-                            throw new IllegalStateException();
-                    }
-                }
-
-                if (!kernalContext().discovery().alive(node))
-                    throw retryException("Node has left topology: " + node.id());
-            }
-        }
-
-        /**
-         * @param rangeId Requested range ID.
-         * @return {@code true} If next row for the requested range was found.
-         */
-        private boolean next(final int rangeId) {
-            for (;;) {
-                if (rangeId == cursorRangeId) {
-                    if (cursor.next())
-                        return true;
-                }
-                else if (rangeId < cursorRangeId)
-                    return false;
-
-                cursor = GridH2Cursor.EMPTY;
-
-                while (!ranges.hasNext()) {
-                    if (remainingRanges == 0) {
-                        ranges = emptyIterator();
-
-                        return false;
-                    }
-
-                    ranges = awaitForResponse().ranges().iterator();
-                }
-
-                GridH2RowRange range = ranges.next();
-
-                cursorRangeId = range.rangeId();
-
-                if (!F.isEmpty(range.rows())) {
-                    final Iterator<GridH2RowMessage> it = range.rows().iterator();
-
-                    if (it.hasNext()) {
-                        cursor = new GridH2Cursor(new Iterator<Row>() {
-                            @Override public boolean hasNext() {
-                                return it.hasNext();
-                            }
-
-                            @Override public Row next() {
-                                // Lazily convert messages into real rows.
-                                return toRow(it.next());
-                            }
+    @SuppressWarnings("unchecked")
+    public Iterator<GridH2Row> findForSegment(GridH2RowRangeBounds bounds, int segment,
+        BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter) {
+        SearchRow first = toSearchRow(bounds.first());
+        SearchRow last = toSearchRow(bounds.last());
 
-                            @Override public void remove() {
-                                throw new UnsupportedOperationException();
-                            }
-                        });
-                    }
-                }
-            }
-        }
+        IgniteTree t = treeForRead(segment);
 
-        /**
-         * @param rangeId Requested range ID.
-         * @return Current row.
-         */
-        private Row get(int rangeId) {
-            assert rangeId == cursorRangeId;
+        try {
+            GridCursor<GridH2Row> range = ((BPlusTree)t).find(first, last, filter, null);
 
-            return cursor.get();
-        }
-    }
+            if (range == null)
+                range = H2Utils.EMPTY_CURSOR;
 
-    /**
-     * Bounds iterator.
-     */
-    private class RangeSource {
-        /** */
-        Iterator<GridH2RowRangeBounds> boundsIter;
-
-        /** */
-        int curRangeId = -1;
-
-        /** */
-        private final int segment;
-
-        /** */
-        private final BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter;
-
-        /** Iterator. */
-        Iterator<GridH2Row> iter = emptyIterator();
-
-        /**
-         * @param bounds Bounds.
-         * @param segment Segment.
-         * @param filter Filter.
-         */
-        RangeSource(Iterable<GridH2RowRangeBounds> bounds, int segment, BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter) {
-            this.segment = segment;
-            this.filter = filter;
-            boundsIter = bounds.iterator();
-        }
+            H2Cursor cur = new H2Cursor(range);
 
-        /**
-         * @return {@code true} If there are more rows in this source.
-         */
-        public boolean hasMoreRows() throws IgniteCheckedException {
-            return boundsIter.hasNext() || iter.hasNext();
+            return new CursorIteratorWrapper(cur);
         }
-
-        /**
-         * @param maxRows Max allowed rows.
-         * @return Range.
-         */
-        public GridH2RowRange next(int maxRows) {
-            assert maxRows > 0 : maxRows;
-
-            for (; ; ) {
-                if (iter.hasNext()) {
-                    // Here we are getting last rows from previously partially fetched range.
-                    List<GridH2RowMessage> rows = new ArrayList<>();
-
-                    GridH2RowRange nextRange = new GridH2RowRange();
-
-                    nextRange.rangeId(curRangeId);
-                    nextRange.rows(rows);
-
-                    do {
-                        rows.add(toRowMessage(iter.next()));
-                    }
-                    while (rows.size() < maxRows && iter.hasNext());
-
-                    if (iter.hasNext())
-                        nextRange.setPartial();
-                    else
-                        iter = emptyIterator();
-
-                    return nextRange;
-                }
-
-                iter = emptyIterator();
-
-                if (!boundsIter.hasNext()) {
-                    boundsIter = emptyIterator();
-
-                    return null;
-                }
-
-                GridH2RowRangeBounds bounds = boundsIter.next();
-
-                curRangeId = bounds.rangeId();
-
-                SearchRow first = toSearchRow(bounds.first());
-                SearchRow last = toSearchRow(bounds.last());
-
-                IgniteTree t = treeForRead(segment);
-
-                iter = new CursorIteratorWrapper(doFind0(t, first, last, filter));
-
-                if (!iter.hasNext()) {
-                    // We have to return empty range here.
-                    GridH2RowRange emptyRange = new GridH2RowRange();
-
-                    emptyRange.rangeId(curRangeId);
-
-                    return emptyRange;
-                }
-            }
+        catch (IgniteCheckedException e) {
+            throw DbException.convert(e);
         }
     }
 
@@ -1559,21 +643,6 @@ public abstract class GridH2IndexBase extends BaseIndex {
     }
 
     /**
-     * @param t Tree.
-     * @param first Lower bound.
-     * @param last Upper bound always inclusive.
-     * @param filter Filter.
-     * @return Iterator over rows in given range.
-     */
-    protected H2Cursor doFind0(
-        IgniteTree t,
-        @Nullable SearchRow first,
-        @Nullable SearchRow last,
-        BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter) {
-        throw new UnsupportedOperationException();
-    }
-
-    /**
      * Re-assign column ids after removal of column(s).
      */
     public void refreshColumnIds() {
@@ -1582,72 +651,4 @@ public abstract class GridH2IndexBase extends BaseIndex {
         for (int pos = 0; pos < columnIds.length; ++pos)
             columnIds[pos] = columns[pos].getColumnId();
     }
-
-    /**
-     * Create retry exception for distributed join.
-     *
-     * @param msg Message.
-     * @return Exception.
-     */
-    private GridH2RetryException retryException(String msg) {
-        return new GridH2RetryException(msg);
-    }
-
-    /**
-     *
-     */
-    private static final class CursorIteratorWrapper implements Iterator<GridH2Row> {
-        /** */
-        private final H2Cursor cursor;
-
-        /** Next element. */
-        private GridH2Row next;
-
-        /**
-         * @param cursor Cursor.
-         */
-        private CursorIteratorWrapper(H2Cursor cursor) {
-            assert cursor != null;
-
-            this.cursor = cursor;
-
-            if (cursor.next())
-                next = (GridH2Row)cursor.get();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean hasNext() {
-            return next != null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridH2Row next() {
-            GridH2Row res = next;
-
-            if (cursor.next())
-                next = (GridH2Row)cursor.get();
-            else
-                next = null;
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void remove() {
-            throw new UnsupportedOperationException("operation is not supported");
-        }
-    }
-
-    /** Empty cursor. */
-    protected static final GridCursor<GridH2Row> EMPTY_CURSOR = new GridCursor<GridH2Row>() {
-        /** {@inheritDoc} */
-        @Override public boolean next() {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridH2Row get() {
-            return null;
-        }
-    };
 }
index f12c0f3..babced3 100644 (file)
@@ -27,13 +27,16 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
+import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode;
+import org.apache.ignite.internal.processors.query.h2.opt.join.CollocationModel;
+import org.apache.ignite.internal.processors.query.h2.opt.join.SourceKey;
 import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
+import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
 
 /**
@@ -44,10 +47,10 @@ public class GridH2QueryContext {
     private static final ThreadLocal<GridH2QueryContext> qctx = new ThreadLocal<>();
 
     /** */
-    private static final ConcurrentMap<Key, GridH2QueryContext> qctxs = new ConcurrentHashMap<>();
+    private static final ConcurrentMap<QueryContextKey, GridH2QueryContext> qctxs = new ConcurrentHashMap<>();
 
     /** */
-    private final Key key;
+    private final QueryContextKey key;
 
     /** */
     private volatile boolean cleared;
@@ -83,7 +86,7 @@ public class GridH2QueryContext {
     private int pageSize;
 
     /** */
-    private GridH2CollocationModel qryCollocationMdl;
+    private CollocationModel qryCollocationMdl;
 
     /** */
     private MvccSnapshot mvccSnapshot;
@@ -100,7 +103,7 @@ public class GridH2QueryContext {
     public GridH2QueryContext(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) {
         assert type != MAP;
 
-        key = new Key(locNodeId, nodeId, qryId, 0, type);
+        key = new QueryContextKey(locNodeId, nodeId, qryId, 0, type);
     }
 
     /**
@@ -117,7 +120,7 @@ public class GridH2QueryContext {
         GridH2QueryType type) {
         assert segmentId == 0 || type == MAP;
 
-        key = new Key(locNodeId, nodeId, qryId, segmentId, type);
+        key = new QueryContextKey(locNodeId, nodeId, qryId, segmentId, type);
     }
 
     /**
@@ -141,34 +144,34 @@ public class GridH2QueryContext {
      * @return Type.
      */
     public GridH2QueryType type() {
-        return key.type;
+        return key.type();
     }
 
     /**
      * @return Origin node ID.
      */
     public UUID originNodeId() {
-        return key.nodeId;
+        return key.nodeId();
     }
 
     /**
      * @return Query request ID.
      */
     public long queryId() {
-        return key.qryId;
+        return key.queryId();
     }
 
     /**
      * @return Query collocation model.
      */
-    public GridH2CollocationModel queryCollocationModel() {
+    public CollocationModel queryCollocationModel() {
         return qryCollocationMdl;
     }
 
     /**
      * @param qryCollocationMdl Query collocation model.
      */
-    public void queryCollocationModel(GridH2CollocationModel qryCollocationMdl) {
+    public void queryCollocationModel(CollocationModel qryCollocationMdl) {
         this.qryCollocationMdl = qryCollocationMdl;
     }
 
@@ -268,7 +271,7 @@ public class GridH2QueryContext {
 
     /** @return index segment ID. */
     public int segment() {
-        return key.segmentId;
+        return key.segmentId();
     }
 
     /**
@@ -351,7 +354,7 @@ public class GridH2QueryContext {
          assert qctx.get() == null;
 
          // We need MAP query context to be available to other threads to run distributed joins.
-         if (x.key.type == MAP && x.distributedJoinMode() != OFF && qctxs.putIfAbsent(x.key, x) != null)
+         if (x.key.type() == MAP && x.distributedJoinMode() != OFF && qctxs.putIfAbsent(x.key, x) != null)
              throw new IllegalStateException("Query context is already set.");
 
          qctx.set(x);
@@ -378,8 +381,12 @@ public class GridH2QueryContext {
     public static boolean clear(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) {
         boolean res = false;
 
-        for (Key key : qctxs.keySet()) {
-            if (key.locNodeId.equals(locNodeId) && key.nodeId.equals(nodeId) && key.qryId == qryId && key.type == type)
+        for (QueryContextKey key : qctxs.keySet()) {
+            if (key.localNodeId().equals(locNodeId) &&
+                key.nodeId().equals(nodeId) &&
+                key.queryId() == qryId &&
+                key.type() == type
+            )
                 res |= doClear(key, false);
         }
 
@@ -391,8 +398,8 @@ public class GridH2QueryContext {
      * @param nodeStop Node is stopping.
      * @return {@code True} if context was found.
      */
-    private static boolean doClear(Key key, boolean nodeStop) {
-        assert key.type == MAP : key.type;
+    private static boolean doClear(QueryContextKey key, boolean nodeStop) {
+        assert key.type() == MAP : key.type();
 
         GridH2QueryContext x = qctxs.remove(key);
 
@@ -436,8 +443,8 @@ public class GridH2QueryContext {
      * @param nodeId Dead node ID.
      */
     public static void clearAfterDeadNode(UUID locNodeId, UUID nodeId) {
-        for (Key key : qctxs.keySet()) {
-            if (key.locNodeId.equals(locNodeId) && key.nodeId.equals(nodeId))
+        for (QueryContextKey key : qctxs.keySet()) {
+            if (key.localNodeId().equals(locNodeId) && key.nodeId().equals(nodeId))
                 doClear(key, false);
         }
     }
@@ -446,8 +453,8 @@ public class GridH2QueryContext {
      * @param locNodeId Local node ID.
      */
     public static void clearLocalNodeStop(UUID locNodeId) {
-        for (Key key : qctxs.keySet()) {
-            if (key.locNodeId.equals(locNodeId))
+        for (QueryContextKey key : qctxs.keySet()) {
+            if (key.localNodeId().equals(locNodeId))
                 doClear(key, true);
         }
     }
@@ -478,7 +485,7 @@ public class GridH2QueryContext {
         int segmentId,
         GridH2QueryType type
     ) {
-        return qctxs.get(new Key(locNodeId, nodeId, qryId, segmentId, type));
+        return qctxs.get(new QueryContextKey(locNodeId, nodeId, qryId, segmentId, type));
     }
 
     /**
@@ -537,116 +544,4 @@ public class GridH2QueryContext {
         return S.toString(GridH2QueryContext.class, this);
     }
 
-    /**
-     * Unique key for the query context.
-     */
-    private static class Key {
-        /** */
-        private final UUID locNodeId;
-
-        /** */
-        private final UUID nodeId;
-
-        /** */
-        private final long qryId;
-
-        /** */
-        private final int segmentId;
-
-        /** */
-        private final GridH2QueryType type;
-
-        /**
-         * @param locNodeId Local node ID.
-         * @param nodeId The node who initiated the query.
-         * @param qryId The query ID.
-         * @param segmentId Index segment ID.
-         * @param type Query type.
-         */
-        private Key(UUID locNodeId, UUID nodeId, long qryId, int segmentId, GridH2QueryType type) {
-            assert locNodeId != null;
-            assert nodeId != null;
-            assert type != null;
-
-            this.locNodeId = locNodeId;
-            this.nodeId = nodeId;
-            this.qryId = qryId;
-            this.segmentId = segmentId;
-            this.type = type;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            Key key = (Key)o;
-
-            return qryId == key.qryId && nodeId.equals(key.nodeId) && type == key.type &&
-               locNodeId.equals(key.locNodeId) ;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int res = locNodeId.hashCode();
-
-            res = 31 * res + nodeId.hashCode();
-            res = 31 * res + (int)(qryId ^ (qryId >>> 32));
-            res = 31 * res + type.hashCode();
-            res = 31 * res + segmentId;
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(Key.class, this);
-        }
-    }
-
-    /**
-     * Key for source.
-     */
-    private static final class SourceKey {
-        /** */
-        UUID ownerId;
-
-        /** */
-        int segmentId;
-
-        /** */
-        int batchLookupId;
-
-        /**
-         * @param ownerId Owner node ID.
-         * @param segmentId Index segment ID.
-         * @param batchLookupId Batch lookup ID.
-         */
-        SourceKey(UUID ownerId, int segmentId, int batchLookupId) {
-            this.ownerId = ownerId;
-            this.segmentId = segmentId;
-            this.batchLookupId = batchLookupId;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (o == null || !(o instanceof SourceKey))
-                return false;
-
-            SourceKey srcKey = (SourceKey)o;
-
-            return batchLookupId == srcKey.batchLookupId && segmentId == srcKey.segmentId &&
-                ownerId.equals(srcKey.ownerId);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int hash = ownerId.hashCode();
-            hash = 31 * hash + segmentId;
-            return 31 * hash + batchLookupId;
-        }
-    }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContextKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContextKey.java
new file mode 100644 (file)
index 0000000..ad0ff20
--- /dev/null
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.UUID;
+
+/**
+ * Unique key for the query context.
+ */
+public class QueryContextKey {
+    /** */
+    private final UUID locNodeId;
+
+    /** */
+    private final UUID nodeId;
+
+    /** */
+    private final long qryId;
+
+    /** */
+    private final int segmentId;
+
+    /** */
+    private final GridH2QueryType type;
+
+    /**
+     * @param locNodeId Local node ID.
+     * @param nodeId The node who initiated the query.
+     * @param qryId The query ID.
+     * @param segmentId Index segment ID.
+     * @param type Query type.
+     */
+    QueryContextKey(UUID locNodeId, UUID nodeId, long qryId, int segmentId, GridH2QueryType type) {
+        assert locNodeId != null;
+        assert nodeId != null;
+        assert type != null;
+
+        this.locNodeId = locNodeId;
+        this.nodeId = nodeId;
+        this.qryId = qryId;
+        this.segmentId = segmentId;
+        this.type = type;
+    }
+
+    /**
+     * @return Local node ID.
+     */
+    public UUID localNodeId() {
+        return locNodeId;
+    }
+
+    /**
+     * @return Node ID.
+     */
+    public UUID nodeId() {
+        return nodeId;
+    }
+
+    /**
+     * @return Query ID.
+     */
+    public long queryId() {
+        return qryId;
+    }
+
+    /**
+     * @return Segment ID.
+     */
+    public int segmentId() {
+        return segmentId;
+    }
+
+    /**
+     * @return Type.
+     */
+    public GridH2QueryType type() {
+        return type;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        QueryContextKey key = (QueryContextKey)o;
+
+        return qryId == key.qryId && nodeId.equals(key.nodeId) && type == key.type &&
+           locNodeId.equals(key.locNodeId) ;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = locNodeId.hashCode();
+
+        res = 31 * res + nodeId.hashCode();
+        res = 31 * res + (int)(qryId ^ (qryId >>> 32));
+        res = 31 * res + type.hashCode();
+        res = 31 * res + segmentId;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(QueryContextKey.class, this);
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/BroadcastCursor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/BroadcastCursor.java
new file mode 100644 (file)
index 0000000..632d72a
--- /dev/null
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
+import org.h2.index.Cursor;
+import org.h2.result.Row;
+import org.h2.result.SearchRow;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Map;
+
+/**
+ * Merge cursor from multiple nodes.
+ */
+public class BroadcastCursor implements Cursor, Comparator<RangeStream> {
+    /** Index. */
+    private final GridH2IndexBase idx;
+
+    /** */
+    private final int rangeId;
+
+    /** */
+    private final RangeStream[] streams;
+
+    /** */
+    private boolean first = true;
+
+    /** */
+    private int off;
+
+    /**
+     * @param rangeId Range ID.
+     * @param segmentKeys Remote nodes.
+     * @param rangeStreams Range streams.
+     */
+    public BroadcastCursor(GridH2IndexBase idx, int rangeId, Collection<SegmentKey> segmentKeys,
+        Map<SegmentKey, RangeStream> rangeStreams) {
+        this.idx = idx;
+        this.rangeId = rangeId;
+
+        streams = new RangeStream[segmentKeys.size()];
+
+        int i = 0;
+
+        for (SegmentKey segmentKey : segmentKeys) {
+            RangeStream stream = rangeStreams.get(segmentKey);
+
+            assert stream != null;
+
+            streams[i++] = stream;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public int compare(RangeStream o1, RangeStream o2) {
+        if (o1 == o2)
+            return 0;
+
+        // Nulls are at the beginning of array.
+        if (o1 == null)
+            return -1;
+
+        if (o2 == null)
+            return 1;
+
+        return idx.compareRows(o1.get(rangeId), o2.get(rangeId));
+    }
+
+    /**
+     * Try to fetch the first row.
+     *
+     * @return {@code true} If we were able to find at least one row.
+     */
+    private boolean goFirst() {
+        // Fetch first row from all the streams and sort them.
+        for (int i = 0; i < streams.length; i++) {
+            if (!streams[i].next(rangeId)) {
+                streams[i] = null;
+                off++; // After sorting this offset will cut off all null elements at the beginning of array.
+            }
+        }
+
+        if (off == streams.length)
+            return false;
+
+        Arrays.sort(streams, this);
+
+        return true;
+    }
+
+    /**
+     * Fetch next row.
+     *
+     * @return {@code true} If we were able to find at least one row.
+     */
+    private boolean goNext() {
+        assert off != streams.length;
+
+        if (!streams[off].next(rangeId)) {
+            // Next row from current min stream was not found -> nullify that stream and bump offset forward.
+            streams[off] = null;
+
+            return ++off != streams.length;
+        }
+
+        // Bubble up current min stream with respect to fetched row to achieve correct sort order of streams.
+        GridH2IndexBase.bubbleUp(streams, off, this);
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean next() {
+        if (first) {
+            first = false;
+
+            return goFirst();
+        }
+
+        return goNext();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Row get() {
+        return streams[off].get(rangeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public SearchRow getSearchRow() {
+        return get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean previous() {
+        throw new UnsupportedOperationException();
+    }
+}
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.h2.opt;
+package org.apache.ignite.internal.processors.query.h2.opt.join;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import javax.cache.CacheException;
+
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.SB;
@@ -42,7 +45,7 @@ import org.h2.table.TableView;
 /**
  * Collocation model for a query.
  */
-public final class GridH2CollocationModel {
+public final class CollocationModel {
     /** */
     public static final int MULTIPLIER_COLLOCATED = 1;
 
@@ -56,7 +59,7 @@ public final class GridH2CollocationModel {
     private static final int MULTIPLIER_REPLICATED_NOT_LAST = 10_000;
 
     /** */
-    private final GridH2CollocationModel upper;
+    private final CollocationModel upper;
 
     /** */
     private final int filter;
@@ -71,13 +74,13 @@ public final class GridH2CollocationModel {
     private Type type;
 
     /** */
-    private GridH2CollocationModel[] children;
+    private CollocationModel[] children;
 
     /** */
     private TableFilter[] childFilters;
 
     /** */
-    private List<GridH2CollocationModel> unions;
+    private List<CollocationModel> unions;
 
     /** */
     private Select select;
@@ -91,7 +94,7 @@ public final class GridH2CollocationModel {
      * @param view This model will be a subquery (or top level query) and must contain child filters.
      * @param validate Query validation flag.
      */
-    private GridH2CollocationModel(GridH2CollocationModel upper, int filter, boolean view, boolean validate) {
+    private CollocationModel(CollocationModel upper, int filter, boolean view, boolean validate) {
         this.upper = upper;
         this.filter = filter;
         this.view = view;
@@ -163,12 +166,12 @@ public final class GridH2CollocationModel {
      * @param validate Query validation flag.
      * @return Created child collocation model.
      */
-    private static GridH2CollocationModel createChildModel(GridH2CollocationModel upper,
+    private static CollocationModel createChildModel(CollocationModel upper,
         int filter,
-        List<GridH2CollocationModel> unions,
+        List<CollocationModel> unions,
         boolean view,
         boolean validate) {
-        GridH2CollocationModel child = new GridH2CollocationModel(upper, filter, view, validate);
+        CollocationModel child = new CollocationModel(upper, filter, view, validate);
 
         if (unions != null) {
             // Bind created child to unions.
@@ -218,7 +221,7 @@ public final class GridH2CollocationModel {
             // We have to clone because H2 reuses array and reorders elements.
             this.childFilters = childFilters.clone();
 
-            children = new GridH2CollocationModel[childFilters.length];
+            children = new CollocationModel[childFilters.length];
         }
         else {
             assert this.childFilters.length == childFilters.length;
@@ -252,7 +255,7 @@ public final class GridH2CollocationModel {
             int maxMultiplier = MULTIPLIER_COLLOCATED;
 
             for (int i = 0; i < childFilters.length; i++) {
-                GridH2CollocationModel child = child(i, true);
+                CollocationModel child = child(i, true);
 
                 Type t = child.type(true);
 
@@ -340,7 +343,7 @@ public final class GridH2CollocationModel {
      */
     private boolean findPartitionedTableBefore(int f) {
         for (int i = 0; i < f; i++) {
-            GridH2CollocationModel child = child(i, true);
+            CollocationModel child = child(i, true);
 
             // The c can be null if it is not a GridH2Table and not a sub-query,
             // it is a some kind of function table or anything else that considered replicated.
@@ -413,7 +416,7 @@ public final class GridH2CollocationModel {
                         TableFilter prevJoin = expCol.getTableFilter();
 
                         if (prevJoin != null) {
-                            GridH2CollocationModel cm = child(indexOf(prevJoin), true);
+                            CollocationModel cm = child(indexOf(prevJoin), true);
 
                             // If the previous joined model is a subquery (view), we can not be sure that
                             // the found affinity column is the needed one, since we can select multiple
@@ -580,8 +583,8 @@ public final class GridH2CollocationModel {
      * @param create Create child if needed.
      * @return Child collocation.
      */
-    private GridH2CollocationModel child(int i, boolean create) {
-        GridH2CollocationModel child = children[i];
+    private CollocationModel child(int i, boolean create) {
+        CollocationModel child = children[i];
 
         if (child == null && create) {
             TableFilter f = childFilters[i];
@@ -615,7 +618,7 @@ public final class GridH2CollocationModel {
     /**
      * @return Unions list.
      */
-    private List<GridH2CollocationModel> getOrCreateUnions() {
+    private List<CollocationModel> getOrCreateUnions() {
         if (unions == null) {
             unions = new ArrayList<>(4);
 
@@ -633,9 +636,9 @@ public final class GridH2CollocationModel {
      * @param validate Query validation flag.
      * @return Collocation.
      */
-    public static GridH2CollocationModel buildCollocationModel(GridH2QueryContext qctx, SubQueryInfo info,
+    public static CollocationModel buildCollocationModel(GridH2QueryContext qctx, SubQueryInfo info,
         TableFilter[] filters, int filter, boolean validate) {
-        GridH2CollocationModel cm;
+        CollocationModel cm;
 
         if (info != null) {
             // Go up until we reach the root query.
@@ -662,12 +665,12 @@ public final class GridH2CollocationModel {
         // Handle union. We have to rely on fact that select will be the same on uppermost select.
         // For sub-queries we will drop collocation models, so that they will be recalculated anyways.
         if (cm.select != null && cm.select != select) {
-            List<GridH2CollocationModel> unions = cm.getOrCreateUnions();
+            List<CollocationModel> unions = cm.getOrCreateUnions();
 
             // Try to find this select in existing unions.
             // Start with 1 because at 0 it always will be c.
             for (int i = 1; i < unions.size(); i++) {
-                GridH2CollocationModel u = unions.get(i);
+                CollocationModel u = unions.get(i);
 
                 if (u.select == select) {
                     cm = u;
@@ -691,7 +694,7 @@ public final class GridH2CollocationModel {
      * @return {@code true} If the query is collocated.
      */
     public static boolean isCollocated(Query qry) {
-        GridH2CollocationModel mdl = buildCollocationModel(null, -1, qry, null, true);
+        CollocationModel mdl = buildCollocationModel(null, -1, qry, null, true);
 
         Type type = mdl.type(true);
 
@@ -710,10 +713,10 @@ public final class GridH2CollocationModel {
      * @param validate Query validation flag.
      * @return Built model.
      */
-    private static GridH2CollocationModel buildCollocationModel(GridH2CollocationModel upper,
+    private static CollocationModel buildCollocationModel(CollocationModel upper,
         int filter,
         Query qry,
-        List<GridH2CollocationModel> unions,
+        List<CollocationModel> unions,
         boolean validate) {
         if (qry.isUnion()) {
             if (unions == null)
@@ -721,8 +724,8 @@ public final class GridH2CollocationModel {
 
             SelectUnion union = (SelectUnion)qry;
 
-            GridH2CollocationModel left = buildCollocationModel(upper, filter, union.getLeft(), unions, validate);
-            GridH2CollocationModel right = buildCollocationModel(upper, filter, union.getRight(), unions, validate);
+            CollocationModel left = buildCollocationModel(upper, filter, union.getLeft(), unions, validate);
+            CollocationModel right = buildCollocationModel(upper, filter, union.getRight(), unions, validate);
 
             assert left != null;
             assert right != null;
@@ -739,7 +742,7 @@ public final class GridH2CollocationModel {
 
         TableFilter[] filters = list.toArray(new TableFilter[list.size()]);
 
-        GridH2CollocationModel cm = createChildModel(upper, filter, unions, true, validate);
+        CollocationModel cm = createChildModel(upper, filter, unions, true, validate);
 
         cm.childFilters(filters);
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CursorIteratorWrapper.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/CursorIteratorWrapper.java
new file mode 100644 (file)
index 0000000..5a174e8
--- /dev/null
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+import org.apache.ignite.internal.processors.query.h2.H2Cursor;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
+
+import java.util.Iterator;
+
+/**
+ *
+ */
+public final class CursorIteratorWrapper implements Iterator<GridH2Row> {
+    /** */
+    private final H2Cursor cursor;
+
+    /** Next element. */
+    private GridH2Row next;
+
+    /**
+     * @param cursor Cursor.
+     */
+    public CursorIteratorWrapper(H2Cursor cursor) {
+        assert cursor != null;
+
+        this.cursor = cursor;
+
+        if (cursor.next())
+            next = (GridH2Row)cursor.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasNext() {
+        return next != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridH2Row next() {
+        GridH2Row res = next;
+
+        if (cursor.next())
+            next = (GridH2Row)cursor.get();
+        else
+            next = null;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void remove() {
+        throw new UnsupportedOperationException("operation is not supported");
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
new file mode 100644 (file)
index 0000000..02d2e44
--- /dev/null
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.h2.index.Cursor;
+import org.h2.index.IndexLookupBatch;
+import org.h2.result.SearchRow;
+import org.h2.util.DoneFuture;
+import org.h2.value.Value;
+import org.h2.value.ValueNull;
+
+import javax.cache.CacheException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Future;
+
+import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.LOCAL_ONLY;
+import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.KEY_COL;
+import static org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor.COL_NOT_EXISTS;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds.rangeBounds;
+
+/**
+ * Index lookup batch.
+ */
+public class DistributedLookupBatch implements IndexLookupBatch {
+    /** Index. */
+    private final GridH2IndexBase idx;
+
+    /** */
+    private final GridCacheContext<?,?> cctx;
+
+    /** */
+    private final boolean ucast;
+
+    /** */
+    private final int affColId;
+
+    /** */
+    private GridH2QueryContext qctx;
+
+    /** */
+    private int batchLookupId;
+
+    /** */
+    private Map<SegmentKey, RangeStream> rangeStreams = Collections.emptyMap();
+
+    /** */
+    private List<SegmentKey> broadcastSegments;
+
+    /** */
+    private List<Future<Cursor>> res = Collections.emptyList();
+
+    /** */
+    private boolean batchFull;
+
+    /** */
+    private boolean findCalled;
+
+    /**
+     * @param cctx Cache Cache context.
+     * @param ucast Unicast or broadcast query.
+     * @param affColId Affinity column ID.
+     */
+    public DistributedLookupBatch(GridH2IndexBase idx, GridCacheContext<?, ?> cctx, boolean ucast, int affColId) {
+        this.idx = idx;
+        this.cctx = cctx;
+        this.ucast = ucast;
+        this.affColId = affColId;
+    }
+
+    /**
+     * @param firstRow First row.
+     * @param lastRow Last row.
+     * @return Affinity key or {@code null}.
+     */
+    private Object getAffinityKey(SearchRow firstRow, SearchRow lastRow) {
+        if (affColId == COL_NOT_EXISTS)
+            return null;
+
+        if (firstRow == null || lastRow == null)
+            return null;
+
+        Value affKeyFirst = firstRow.getValue(affColId);
+        Value affKeyLast = lastRow.getValue(affColId);
+
+        if (affKeyFirst != null && equal(affKeyFirst, affKeyLast))
+            return affKeyFirst == ValueNull.INSTANCE ? GridH2IndexBase.EXPLICIT_NULL : affKeyFirst.getObject();
+
+        if (idx.getTable().rowDescriptor().isKeyColumn(affColId))
+            return null;
+
+        // Try to extract affinity key from primary key.
+        Value pkFirst = firstRow.getValue(KEY_COL);
+        Value pkLast = lastRow.getValue(KEY_COL);
+
+        if (pkFirst == ValueNull.INSTANCE || pkLast == ValueNull.INSTANCE)
+            return GridH2IndexBase.EXPLICIT_NULL;
+
+        if (pkFirst == null || pkLast == null || !equal(pkFirst, pkLast))
+            return null;
+
+        Object pkAffKeyFirst = cctx.affinity().affinityKey(pkFirst.getObject());
+        Object pkAffKeyLast = cctx.affinity().affinityKey(pkLast.getObject());
+
+        if (pkAffKeyFirst == null || pkAffKeyLast == null)
+            throw new CacheException("Cache key without affinity key.");
+
+        if (pkAffKeyFirst.equals(pkAffKeyLast))
+            return pkAffKeyFirst;
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"ForLoopReplaceableByForEach", "IfMayBeConditional"})
+    @Override public boolean addSearchRows(SearchRow firstRow, SearchRow lastRow) {
+        if (qctx == null || findCalled) {
+            if (qctx == null) {
+                // It is the first call after query begin (may be after reuse),
+                // reinitialize query context and result.
+                qctx = GridH2QueryContext.get();
+                res = new ArrayList<>();
+
+                assert qctx != null;
+                assert !findCalled;
+            }
+            else {
+                // Cleanup after the previous lookup phase.
+                assert batchLookupId != 0;
+
+                findCalled = false;
+                qctx.putStreams(batchLookupId, null);
+                res.clear();
+            }
+
+            // Reinitialize for the next lookup phase.
+            batchLookupId = qctx.nextBatchLookupId();
+            rangeStreams = new HashMap<>();
+        }
+
+        Object affKey = getAffinityKey(firstRow, lastRow);
+
+        boolean locQry = localQuery();
+
+        List<SegmentKey> segmentKeys;
+
+        if (affKey != null) {
+            // Affinity key is provided.
+            if (affKey == GridH2IndexBase.EXPLICIT_NULL) // Affinity key is explicit null, we will not find anything.
+                return false;
+
+            segmentKeys = F.asList(rangeSegment(affKey, locQry));
+        }
+        else {
+            // Affinity key is not provided or is not the same in upper and lower bounds, we have to broadcast.
+            if (broadcastSegments == null)
+                broadcastSegments = broadcastSegments(locQry);
+
+            segmentKeys = broadcastSegments;
+        }
+
+        if (locQry && segmentKeys.isEmpty())
+            return false; // Nothing to do
+
+        assert !F.isEmpty(segmentKeys) : segmentKeys;
+
+        final int rangeId = res.size();
+
+        // Create messages.
+        GridH2RowMessage first = idx.toSearchRowMessage(firstRow);
+        GridH2RowMessage last = idx.toSearchRowMessage(lastRow);
+
+        // Range containing upper and lower bounds.
+        GridH2RowRangeBounds rangeBounds = rangeBounds(rangeId, first, last);
+
+        // Add range to every message of every participating node.
+        for (int i = 0; i < segmentKeys.size(); i++) {
+            SegmentKey segmentKey = segmentKeys.get(i);
+            assert segmentKey != null;
+
+            RangeStream stream = rangeStreams.get(segmentKey);
+
+            List<GridH2RowRangeBounds> bounds;
+
+            if (stream == null) {
+                stream = new RangeStream(cctx.kernalContext(), idx, qctx, segmentKey.node());
+
+                stream.request(createRequest(qctx, batchLookupId, segmentKey.segmentId()));
+                stream.request().bounds(bounds = new ArrayList<>());
+
+                rangeStreams.put(segmentKey, stream);
+            }
+            else
+                bounds = stream.request().bounds();
+
+            bounds.add(rangeBounds);
+
+            // If at least one node will have a full batch then we are ok.
+            if (bounds.size() >= qctx.pageSize())
+                batchFull = true;
+        }
+
+        Cursor cur;
+
+        if (segmentKeys.size() == 1)
+            cur = new UnicastCursor(rangeId, rangeStreams.get(F.first(segmentKeys)));
+        else
+            cur = new BroadcastCursor(idx, rangeId, segmentKeys, rangeStreams);
+
+        res.add(new DoneFuture<>(cur));
+
+        return true;
+    }
+
+    /**
+     * @param v1 First value.
+     * @param v2 Second value.
+     * @return {@code true} If they equal.
+     */
+    private boolean equal(Value v1, Value v2) {
+        return v1 == v2 || (v1 != null && v2 != null &&
+            v1.compareTypeSafe(v2, idx.getDatabase().getCompareMode()) == 0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isBatchFull() {
+        return batchFull;
+    }
+
+    /**
+     * @return {@code True} if local query execution is enforced.
+     */
+    private boolean localQuery() {
+        assert qctx != null : "Missing query context: " + this;
+
+        return qctx.distributedJoinMode() == LOCAL_ONLY;
+    }
+
+    /**
+     *
+     */
+    private void startStreams() {
+        if (rangeStreams.isEmpty()) {
+            assert res.isEmpty();
+
+            return;
+        }
+
+        qctx.putStreams(batchLookupId, rangeStreams);
+
+        // Start streaming.
+        for (RangeStream stream : rangeStreams.values()) {
+            stream.start();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+    @Override public List<Future<Cursor>> find() {
+        batchFull = false;
+        findCalled = true;
+
+        startStreams();
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void reset(boolean beforeQry) {
+        if (beforeQry || qctx == null) // Query context can be null if addSearchRows was never called.
+            return;
+
+        assert batchLookupId != 0;
+
+        // Do cleanup after the query run.
+        qctx.putStreams(batchLookupId, null);
+        qctx = null; // The same query can be reused multiple times for different query contexts.
+        batchLookupId = 0;
+
+        rangeStreams = Collections.emptyMap();
+        broadcastSegments = null;
+        batchFull = false;
+        findCalled = false;
+        res = Collections.emptyList();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getPlanSQL() {
+        return ucast ? "unicast" : "broadcast";
+    }
+
+    /**
+     * @param affKeyObj Affinity key.
+     * @param isLocalQry Local query flag.
+     * @return Segment key for Affinity key.
+     */
+    public SegmentKey rangeSegment(Object affKeyObj, boolean isLocalQry) {
+        assert affKeyObj != null && affKeyObj != GridH2IndexBase.EXPLICIT_NULL : affKeyObj;
+
+        ClusterNode node;
+
+        int partition = cctx.affinity().partition(affKeyObj);
+
+        if (isLocalQry) {
+            if (qctx.partitionsMap() != null) {
+                // If we have explicit partitions map, we have to use it to calculate affinity node.
+                UUID nodeId = qctx.nodeForPartition(partition, cctx);
+
+                if(!cctx.localNodeId().equals(nodeId))
+                    return null; // Prevent remote index call for local queries.
+            }
+
+            if (!cctx.affinity().primaryByKey(cctx.localNode(), partition, qctx.topologyVersion()))
+                return null;
+
+            node = cctx.localNode();
+        }
+        else{
+            if (qctx.partitionsMap() != null) {
+                // If we have explicit partitions map, we have to use it to calculate affinity node.
+                UUID nodeId = qctx.nodeForPartition(partition, cctx);
+
+                node = cctx.discovery().node(nodeId);
+            }
+            else // Get primary node for current topology version.
+                node = cctx.affinity().primaryByKey(affKeyObj, qctx.topologyVersion());
+
+            if (node == null) // Node was not found, probably topology changed and we need to retry the whole query.
+                throw H2Utils.retryException("Failed to get primary node by key for range segment.");
+        }
+
+        return new SegmentKey(node, idx.segmentForPartition(partition));
+    }
+
+    /**
+     * @param isLocalQry Local query flag.
+     * @return Collection of nodes for broadcasting.
+     */
+    public List<SegmentKey> broadcastSegments(boolean isLocalQry) {
+        Map<UUID, int[]> partMap = qctx.partitionsMap();
+
+        List<ClusterNode> nodes;
+
+        if (isLocalQry) {
+            if (partMap != null && !partMap.containsKey(cctx.localNodeId()))
+                return Collections.emptyList(); // Prevent remote index call for local queries.
+
+            nodes = Collections.singletonList(cctx.localNode());
+        }
+        else {
+            if (partMap == null)
+                nodes = new ArrayList<>(CU.affinityNodes(cctx, qctx.topologyVersion()));
+            else {
+                nodes = new ArrayList<>(partMap.size());
+
+                for (UUID nodeId : partMap.keySet()) {
+                    ClusterNode node = cctx.kernalContext().discovery().node(nodeId);
+
+                    if (node == null)
+                        throw H2Utils.retryException("Failed to get node by ID during broadcast [" +
+                            "nodeId=" + nodeId + ']');
+
+                    nodes.add(node);
+                }
+            }
+
+            if (F.isEmpty(nodes))
+                throw H2Utils.retryException("Failed to collect affinity nodes during broadcast [" +
+                    "cacheName=" + cctx.name() + ']');
+        }
+
+        int segmentsCount = idx.segmentsCount();
+
+        List<SegmentKey> res = new ArrayList<>(nodes.size() * segmentsCount);
+
+        for (ClusterNode node : nodes) {
+            for (int seg = 0; seg < segmentsCount; seg++)
+                res.add(new SegmentKey(node, seg));
+        }
+
+        return res;
+    }
+
+    /**
+     * @param qctx Query context.
+     * @param batchLookupId Batch lookup ID.
+     * @param segmentId Segment ID.
+     * @return Index range request.
+     */
+    public static GridH2IndexRangeRequest createRequest(GridH2QueryContext qctx, int batchLookupId, int segmentId) {
+        GridH2IndexRangeRequest req = new GridH2IndexRangeRequest();
+
+        req.originNodeId(qctx.originNodeId());
+        req.queryId(qctx.queryId());
+        req.originSegmentId(qctx.segment());
+        req.segment(segmentId);
+        req.batchLookupId(batchLookupId);
+
+        return req;
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeSource.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeSource.java
new file mode 100644 (file)
index 0000000..8cce83d
--- /dev/null
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRange;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static java.util.Collections.emptyIterator;
+
+/**
+ * Bounds iterator.
+ */
+public class RangeSource {
+    /** Index. */
+    private final GridH2IndexBase idx;
+
+    /** */
+    private Iterator<GridH2RowRangeBounds> boundsIter;
+
+    /** */
+    private int curRangeId = -1;
+
+    /** */
+    private final int segment;
+
+    /** */
+    private final BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter;
+
+    /** Iterator. */
+    private Iterator<GridH2Row> iter = emptyIterator();
+
+    /**
+     * @param bounds Bounds.
+     * @param segment Segment.
+     * @param filter Filter.
+     */
+    public RangeSource(
+        GridH2IndexBase idx,
+        Iterable<GridH2RowRangeBounds> bounds,
+        int segment,
+        BPlusTree.TreeRowClosure<GridH2SearchRow, GridH2Row> filter
+    ) {
+        this.idx = idx;
+        this.segment = segment;
+        this.filter = filter;
+
+        boundsIter = bounds.iterator();
+    }
+
+    /**
+     * @return {@code true} If there are more rows in this source.
+     */
+    public boolean hasMoreRows() throws IgniteCheckedException {
+        return boundsIter.hasNext() || iter.hasNext();
+    }
+
+    /**
+     * @param maxRows Max allowed rows.
+     * @return Range.
+     */
+    public GridH2RowRange next(int maxRows) {
+        assert maxRows > 0 : maxRows;
+
+        for (; ; ) {
+            if (iter.hasNext()) {
+                // Here we are getting last rows from previously partially fetched range.
+                List<GridH2RowMessage> rows = new ArrayList<>();
+
+                GridH2RowRange nextRange = new GridH2RowRange();
+
+                nextRange.rangeId(curRangeId);
+                nextRange.rows(rows);
+
+                do {
+                    rows.add(H2Utils.toRowMessage(iter.next()));
+                }
+                while (rows.size() < maxRows && iter.hasNext());
+
+                if (iter.hasNext())
+                    nextRange.setPartial();
+                else
+                    iter = emptyIterator();
+
+                return nextRange;
+            }
+
+            iter = emptyIterator();
+
+            if (!boundsIter.hasNext()) {
+                boundsIter = emptyIterator();
+
+                return null;
+            }
+
+            GridH2RowRangeBounds bounds = boundsIter.next();
+
+            curRangeId = bounds.rangeId();
+
+            iter = idx.findForSegment(bounds, segment, filter);
+
+            if (!iter.hasNext()) {
+                // We have to return empty range here.
+                GridH2RowRange emptyRange = new GridH2RowRange();
+
+                emptyRange.rangeId(curRangeId);
+
+                return emptyRange;
+            }
+        }
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeStream.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/RangeStream.java
new file mode 100644 (file)
index 0000000..0684089
--- /dev/null
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRange;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessage;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.h2.index.Cursor;
+import org.h2.result.Row;
+import org.h2.value.Value;
+
+import javax.cache.CacheException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Collections.emptyIterator;
+import static java.util.Collections.singletonList;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_ERROR;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_NOT_FOUND;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_OK;
+import static org.h2.result.Row.MEMORY_CALCULATE;
+
+/**
+ * Per node range stream.
+ */
+public class RangeStream {
+    /** Kernal context. */
+    private final GridKernalContext ctx;
+
+    /** Index. */
+    private final GridH2IndexBase idx;
+
+    /** */
+    private final GridH2QueryContext qctx;
+
+    /** */
+    private final ClusterNode node;
+
+    /** */
+    private GridH2IndexRangeRequest req;
+
+    /** */
+    private int remainingRanges;
+
+    /** */
+    private final BlockingQueue<GridH2IndexRangeResponse> respQueue = new LinkedBlockingQueue<>();
+
+    /** */
+    private Iterator<GridH2RowRange> ranges = emptyIterator();
+
+    /** */
+    private Cursor cursor = GridH2Cursor.EMPTY;
+
+    /** */
+    private int cursorRangeId = -1;
+
+    /**
+     * @param qctx Query context.
+     * @param node Node.
+     */
+    public RangeStream(GridKernalContext ctx, GridH2IndexBase idx, GridH2QueryContext qctx, ClusterNode node) {
+        this.ctx = ctx;
+        this.idx = idx;
+        this.node = node;
+        this.qctx = qctx;
+    }
+
+    /**
+     * Start streaming.
+     */
+    public void start() {
+        remainingRanges = req.bounds().size();
+
+        assert remainingRanges > 0;
+
+        idx.send(singletonList(node), req);
+    }
+
+    /**
+     * @param msg Response.
+     */
+    public void onResponse(GridH2IndexRangeResponse msg) {
+        respQueue.add(msg);
+    }
+
+    /**
+     * @param req Current request.
+     */
+    public void request(GridH2IndexRangeRequest req) {
+        this.req = req;
+    }
+
+    /**
+     * @return Current request.
+     */
+    public GridH2IndexRangeRequest request() {
+        return req;
+    }
+
+    /**
+     * @return Response.
+     */
+    private GridH2IndexRangeResponse awaitForResponse() {
+        assert remainingRanges > 0;
+
+        final long start = U.currentTimeMillis();
+
+        for (int attempt = 0;; attempt++) {
+            if (qctx.isCleared())
+                throw H2Utils.retryException("Query is cancelled.");
+
+            if (ctx.isStopping())
+                throw H2Utils.retryException("Local node is stopping.");
+
+            GridH2IndexRangeResponse res;
+
+            try {
+                res = respQueue.poll(500, TimeUnit.MILLISECONDS);
+            }
+            catch (InterruptedException ignored) {
+                throw H2Utils.retryException("Interrupted while waiting for reply.");
+            }
+
+            if (res != null) {
+                switch (res.status()) {
+                    case STATUS_OK:
+                        List<GridH2RowRange> ranges0 = res.ranges();
+
+                        remainingRanges -= ranges0.size();
+
+                        if (ranges0.get(ranges0.size() - 1).isPartial())
+                            remainingRanges++;
+
+                        if (remainingRanges > 0) {
+                            if (req.bounds() != null)
+                                req = DistributedLookupBatch.createRequest(qctx, req.batchLookupId(), req.segment());
+
+                            // Prefetch next page.
+                            idx.send(singletonList(node), req);
+                        }
+                        else
+                            req = null;
+
+                        return res;
+
+                    case STATUS_NOT_FOUND:
+                        if (req == null || req.bounds() == null) // We have already received the first response.
+                            throw H2Utils.retryException("Failure on remote node.");
+
+                        if (U.currentTimeMillis() - start > 30_000)
+                            throw H2Utils.retryException("Timeout reached.");
+
+                        try {
+                            U.sleep(20 * attempt);
+                        }
+                        catch (IgniteInterruptedCheckedException e) {
+                            throw new IgniteInterruptedException(e.getMessage());
+                        }
+
+                        // Retry to send the request once more after some time.
+                        idx.send(singletonList(node), req);
+
+                        break;
+
+                    case STATUS_ERROR:
+                        throw new CacheException(res.error());
+
+                    default:
+                        throw new IllegalStateException();
+                }
+            }
+
+            if (!ctx.discovery().alive(node))
+                throw H2Utils.retryException("Node has left topology: " + node.id());
+        }
+    }
+
+    /**
+     * @param rangeId Requested range ID.
+     * @return {@code true} If next row for the requested range was found.
+     */
+    public boolean next(final int rangeId) {
+        for (;;) {
+            if (rangeId == cursorRangeId) {
+                if (cursor.next())
+                    return true;
+            }
+            else if (rangeId < cursorRangeId)
+                return false;
+
+            cursor = GridH2Cursor.EMPTY;
+
+            while (!ranges.hasNext()) {
+                if (remainingRanges == 0) {
+                    ranges = emptyIterator();
+
+                    return false;
+                }
+
+                ranges = awaitForResponse().ranges().iterator();
+            }
+
+            GridH2RowRange range = ranges.next();
+
+            cursorRangeId = range.rangeId();
+
+            if (!F.isEmpty(range.rows())) {
+                final Iterator<GridH2RowMessage> it = range.rows().iterator();
+
+                if (it.hasNext()) {
+                    cursor = new GridH2Cursor(new Iterator<Row>() {
+                        @Override public boolean hasNext() {
+                            return it.hasNext();
+                        }
+
+                        @Override public Row next() {
+                            // Lazily convert messages into real rows.
+                            return toRow(it.next());
+                        }
+
+                        @Override public void remove() {
+                            throw new UnsupportedOperationException();
+                        }
+                    });
+                }
+            }
+        }
+    }
+
+    /**
+     * @param msg Message.
+     * @return Row.
+     */
+    private Row toRow(GridH2RowMessage msg) {
+        if (msg == null)
+            return null;
+
+        List<GridH2ValueMessage> vals = msg.values();
+
+        assert !F.isEmpty(vals) : vals;
+
+        Value[] vals0 = new Value[vals.size()];
+
+        for (int i = 0; i < vals0.length; i++) {
+            try {
+                vals0[i] = vals.get(i).value(ctx);
+            }
+            catch (IgniteCheckedException e) {
+                throw new CacheException(e);
+            }
+        }
+
+        return idx.getDatabase().createRow(vals0, MEMORY_CALCULATE);
+    }
+
+    /**
+     * @param rangeId Requested range ID.
+     * @return Current row.
+     */
+    public Row get(int rangeId) {
+        assert rangeId == cursorRangeId;
+
+        return cursor.get();
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/SegmentKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/SegmentKey.java
new file mode 100644 (file)
index 0000000..9bdbab4
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+import org.apache.ignite.cluster.ClusterNode;
+
+/**
+ * Segment key.
+ */
+public class SegmentKey {
+    /** */
+    private final ClusterNode node;
+
+    /** */
+    private final int segmentId;
+
+    /**
+     * Constructor.
+     *
+     * @param node Node.
+     * @param segmentId Segment ID.
+     */
+    public SegmentKey(ClusterNode node, int segmentId) {
+        assert node != null;
+
+        this.node = node;
+        this.segmentId = segmentId;
+    }
+
+    /**
+     * @return Node.
+     */
+    public ClusterNode node() {
+        return node;
+    }
+
+    /**
+     * @return Segment ID.
+     */
+    public int segmentId() {
+        return segmentId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        SegmentKey key = (SegmentKey)o;
+
+        return segmentId == key.segmentId && node.id().equals(key.node.id());
+
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = node.hashCode();
+
+        res = 31 * res + segmentId;
+
+        return res;
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/SourceKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/SourceKey.java
new file mode 100644 (file)
index 0000000..9cf7629
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+import java.util.UUID;
+
+/**
+ * Key for source.
+ */
+public class SourceKey {
+    /** */
+    private final UUID ownerId;
+
+    /** */
+    private final int segmentId;
+
+    /** */
+    private final int batchLookupId;
+
+    /**
+     * @param ownerId Owner node ID.
+     * @param segmentId Index segment ID.
+     * @param batchLookupId Batch lookup ID.
+     */
+    public SourceKey(UUID ownerId, int segmentId, int batchLookupId) {
+        this.ownerId = ownerId;
+        this.segmentId = segmentId;
+        this.batchLookupId = batchLookupId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (!(o instanceof SourceKey))
+            return false;
+
+        SourceKey srcKey = (SourceKey)o;
+
+        return batchLookupId == srcKey.batchLookupId && segmentId == srcKey.segmentId &&
+            ownerId.equals(srcKey.ownerId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int hash = ownerId.hashCode();
+
+        hash = 31 * hash + segmentId;
+        hash = 31 * hash + batchLookupId;
+
+        return hash;
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/UnicastCursor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/UnicastCursor.java
new file mode 100644 (file)
index 0000000..13e2bcf
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt.join;
+
+import org.h2.index.Cursor;
+import org.h2.result.Row;
+import org.h2.result.SearchRow;
+
+/**
+ * Simple cursor from a single node.
+ */
+public class UnicastCursor implements Cursor {
+    /** */
+    private final int rangeId;
+
+    /** */
+    private final RangeStream stream;
+
+    /**
+     * @param rangeId Range ID.
+     * @param stream Stream.
+     */
+    public UnicastCursor(int rangeId, RangeStream stream) {
+        assert stream != null;
+
+        this.rangeId = rangeId;
+        this.stream = stream;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean next() {
+        return stream.next(rangeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Row get() {
+        return stream.get(rangeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public SearchRow getSearchRow() {
+        return get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean previous() {
+        throw new UnsupportedOperationException();
+    }
+}
index 165ec0c..fcb3424 100644 (file)
@@ -53,7 +53,7 @@ import org.h2.command.dml.SelectUnion;
 import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2CollocationModel.isCollocated;
+import static org.apache.ignite.internal.processors.query.h2.opt.join.CollocationModel.isCollocated;
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlConst.TRUE;
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.AVG;
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.CAST;
index b69ee78..2e611de 100644 (file)
@@ -75,7 +75,7 @@ import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.ResultSetEnlistFuture;
 import org.apache.ignite.internal.processors.query.h2.UpdateResult;
-import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
+import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
@@ -111,8 +111,8 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUE
 import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
-import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
-import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.distributedJoinMode;
+import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
+import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.distributedJoinMode;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED;
 import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.toMessages;
index d90331c..7f874fe 100644 (file)
@@ -107,7 +107,7 @@ import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.checkAc
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.mvccEnabled;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.tx;
 import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS;
-import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
+import static org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinMode.OFF;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE;
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter.mergeTableIdentifier;
 
index f2bd25a..69230f7 100644 (file)
@@ -34,8 +34,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.lang.IgniteInClosure;
@@ -290,7 +290,7 @@ public class RetryCauseMessageSelfTest extends GridCommonAbstractTest {
                         reservations.put(grpKey, new GridReservable() {
 
                             @Override public boolean reserve() {
-                                throw new GridH2RetryException("test retry exception");
+                                throw H2Utils.retryException("test retry exception");
                             }
 
                             @Override public void release() {