IGNITE-5804 ScanQuery transformer should be applied to all result pages - Fixes ...
authorSlava Koptilin <slava.koptilin@gmail.com>
Thu, 15 Feb 2018 11:57:40 +0000 (14:57 +0300)
committerAlexey Goncharuk <alexey.goncharuk@gmail.com>
Thu, 15 Feb 2018 11:57:40 +0000 (14:57 +0300)
Signed-off-by: Alexey Goncharuk <alexey.goncharuk@gmail.com>
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java

index 6f21bd6..f310511 100644 (file)
@@ -534,8 +534,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * @throws IgniteCheckedException In case of error.
      */
     @SuppressWarnings("unchecked")
-    private QueryResult<K, V> executeQuery(GridCacheQueryAdapter<?> qry,
-        @Nullable Object[] args, boolean loc, @Nullable UUID subjId, @Nullable String taskName, Object rcpt)
+    private QueryResult<K, V> executeQuery(GridCacheQueryAdapter<?> qry, @Nullable Object[] args,
+        IgniteClosure transformer, boolean loc, @Nullable UUID subjId, @Nullable String taskName, Object rcpt)
         throws IgniteCheckedException {
         if (qry.type() == null) {
             assert !loc;
@@ -589,7 +589,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                             taskName));
                     }
 
-                    iter = scanIterator(qry, false);
+                    iter = scanIterator(qry, transformer, false);
 
                     break;
 
@@ -794,12 +794,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
     /**
      * @param qry Query.
+     * @param transformer Transformer.
      * @param locNode Local node.
      * @return Full-scan row iterator.
      * @throws IgniteCheckedException If failed to get iterator.
      */
     @SuppressWarnings({"unchecked"})
-    private GridCloseableIterator scanIterator(final GridCacheQueryAdapter<?> qry, boolean locNode)
+    private GridCloseableIterator scanIterator(final GridCacheQueryAdapter<?> qry, IgniteClosure transformer,
+        boolean locNode)
         throws IgniteCheckedException {
         final IgniteBiPredicate<K, V> keyValFilter = qry.scanFilter();
 
@@ -847,7 +849,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                 it = cctx.offheap().cacheIterator(cctx.cacheId(), true, backups, topVer);
             }
 
-            return new ScanQueryIterator(it, qry, topVer, locPart, keyValFilter, locNode, cctx, log);
+            return new ScanQueryIterator(it, qry, topVer, locPart, keyValFilter, transformer, locNode, cctx, log);
         }
         catch (IgniteCheckedException | RuntimeException e) {
             closeScanFilter(keyValFilter);
@@ -1119,11 +1121,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
                 String taskName = cctx.kernalContext().task().resolveTaskName(qry.taskHash());
 
-                IgniteSpiCloseableIterator<IgniteBiTuple<K, V>> iter;
+                IgniteSpiCloseableIterator iter;
                 GridCacheQueryType type;
 
                 res = loc ?
-                    executeQuery(qry, qryInfo.arguments(), loc, qry.subjectId(), taskName,
+                    executeQuery(qry, qryInfo.arguments(), trans, loc, qry.subjectId(), taskName,
                         recipient(qryInfo.senderId(), qryInfo.requestId())) :
                     queryResult(qryInfo, taskName);
 
@@ -1162,142 +1164,113 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                     if(!iter.hasNext())
                         break;
 
-                    IgniteBiTuple<K, V> row = iter.next();
+                    Object row0 = iter.next();
 
                     // Query is cancelled.
-                    if (row == null) {
+                    if (row0 == null) {
                         onPageReady(loc, qryInfo, null, true, null);
 
                         break;
                     }
 
-                    final K key = row.getKey();
-
-                    // Filter backups for SCAN queries, if it isn't partition scan.
-                    // Other types are filtered in indexing manager.
-                    if (!cctx.isReplicated() && qry.type() == SCAN && qry.partition() == null &&
-                        cctx.config().getCacheMode() != LOCAL && !incBackups &&
-                        !cctx.affinity().primaryByKey(cctx.localNode(), key, topVer)) {
-                        if (log.isDebugEnabled())
-                            log.debug("Ignoring backup element [row=" + row +
-                                ", cacheMode=" + cctx.config().getCacheMode() + ", incBackups=" + incBackups +
-                                ", primary=" + cctx.affinity().primaryByKey(cctx.localNode(), key, topVer) + ']');
-
-                        continue;
-                    }
-
-                    V val = row.getValue();
-
-                    if (log.isDebugEnabled()) {
-                        ClusterNode primaryNode = cctx.affinity().primaryByKey(key,
-                            cctx.affinity().affinityTopologyVersion());
-
-                        log.debug(S.toString("Record",
-                            "key", key, true,
-                            "val", val, true,
-                            "incBackups", incBackups, false,
-                            "priNode", primaryNode != null ? U.id8(primaryNode.id()) : null, false,
-                            "node", U.id8(cctx.localNode().id()), false));
-                    }
-
-                    if (val == null) {
-                        if (log.isDebugEnabled())
-                            log.debug(S.toString("Unsuitable record value", "val", val, true));
-
-                        continue;
-                    }
-
-                    if (statsEnabled) {
-                        CacheMetricsImpl metrics = cctx.cache().metrics0();
+                    if (type == SCAN)
+                        // Scan iterator may return already transformed entry
+                        data.add(row0);
+                    else {
+                        IgniteBiTuple<K, V> row = (IgniteBiTuple<K, V>) row0;
 
-                        metrics.onRead(true);
+                        final K key = row.getKey();
 
-                        metrics.addGetTimeNanos(System.nanoTime() - start);
-                    }
+                        V val = row.getValue();
 
-                    K key0 = null;
-                    V val0 = null;
+                        if (log.isDebugEnabled()) {
+                            ClusterNode primaryNode = cctx.affinity().primaryByKey(key,
+                                cctx.affinity().affinityTopologyVersion());
 
-                    if (readEvt && cctx.gridEvents().hasListener(EVT_CACHE_QUERY_OBJECT_READ)) {
-                        key0 = (K)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, key, qry.keepBinary(), false);
-                        val0 = (V)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, val, qry.keepBinary(), false);
-
-                        switch (type) {
-                            case SQL:
-                                cctx.gridEvents().record(new CacheQueryReadEvent<>(
-                                    cctx.localNode(),
-                                    "SQL query entry read.",
-                                    EVT_CACHE_QUERY_OBJECT_READ,
-                                    CacheQueryType.SQL.name(),
-                                    cctx.name(),
-                                    qry.queryClassName(),
-                                    qry.clause(),
-                                    null,
-                                    null,
-                                    qryInfo.arguments(),
-                                    qry.subjectId(),
-                                    taskName,
-                                    key0,
-                                    val0,
-                                    null,
-                                    null));
+                            log.debug(S.toString("Record",
+                                "key", key, true,
+                                "val", val, true,
+                                "incBackups", incBackups, false,
+                                "priNode", primaryNode != null ? U.id8(primaryNode.id()) : null, false,
+                                "node", U.id8(cctx.localNode().id()), false));
+                        }
 
-                                break;
+                        if (val == null) {
+                            if (log.isDebugEnabled())
+                                log.debug(S.toString("Unsuitable record value", "val", val, true));
 
-                            case TEXT:
-                                cctx.gridEvents().record(new CacheQueryReadEvent<>(
-                                    cctx.localNode(),
-                                    "Full text query entry read.",
-                                    EVT_CACHE_QUERY_OBJECT_READ,
-                                    CacheQueryType.FULL_TEXT.name(),
-                                    cctx.name(),
-                                    qry.queryClassName(),
-                                    qry.clause(),
-                                    null,
-                                    null,
-                                    null,
-                                    qry.subjectId(),
-                                    taskName,
-                                    key0,
-                                    val0,
-                                    null,
-                                    null));
+                            continue;
+                        }
 
-                                break;
+                        if (statsEnabled) {
+                            CacheMetricsImpl metrics = cctx.cache().metrics0();
 
-                            case SCAN:
-                                cctx.gridEvents().record(new CacheQueryReadEvent<>(
-                                    cctx.localNode(),
-                                    "Scan query entry read.",
-                                    EVT_CACHE_QUERY_OBJECT_READ,
-                                    CacheQueryType.SCAN.name(),
-                                    cctx.name(),
-                                    null,
-                                    null,
-                                    qry.scanFilter(),
-                                    null,
-                                    null,
-                                    qry.subjectId(),
-                                    taskName,
-                                    key0,
-                                    val0,
-                                    null,
-                                    null));
+                            metrics.onRead(true);
 
-                                break;
+                            metrics.addGetTimeNanos(System.nanoTime() - start);
                         }
-                    }
 
-                    if (rdc != null || trans != null) {
-                        if (key0 == null)
+                        K key0 = null;
+                        V val0 = null;
+
+                        if (readEvt && cctx.gridEvents().hasListener(EVT_CACHE_QUERY_OBJECT_READ)) {
                             key0 = (K)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, key, qry.keepBinary(), false);
-                        if (val0 == null)
                             val0 = (V)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, val, qry.keepBinary(), false);
 
-                        Cache.Entry<K, V> entry = new CacheEntryImpl(key0, val0);
+                            switch (type) {
+                                case SQL:
+                                    cctx.gridEvents().record(new CacheQueryReadEvent<>(
+                                        cctx.localNode(),
+                                        "SQL query entry read.",
+                                        EVT_CACHE_QUERY_OBJECT_READ,
+                                        CacheQueryType.SQL.name(),
+                                        cctx.name(),
+                                        qry.queryClassName(),
+                                        qry.clause(),
+                                        null,
+                                        null,
+                                        qryInfo.arguments(),
+                                        qry.subjectId(),
+                                        taskName,
+                                        key0,
+                                        val0,
+                                        null,
+                                        null));
+
+                                    break;
+
+                                case TEXT:
+                                    cctx.gridEvents().record(new CacheQueryReadEvent<>(
+                                        cctx.localNode(),
+                                        "Full text query entry read.",
+                                        EVT_CACHE_QUERY_OBJECT_READ,
+                                        CacheQueryType.FULL_TEXT.name(),
+                                        cctx.name(),
+                                        qry.queryClassName(),
+                                        qry.clause(),
+                                        null,
+                                        null,
+                                        null,
+                                        qry.subjectId(),
+                                        taskName,
+                                        key0,
+                                        val0,
+                                        null,
+                                        null));
+
+                                    break;
+                            }
+                        }
 
-                        // Reduce.
                         if (rdc != null) {
+                            if (key0 == null)
+                                key0 = (K)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, key, qry.keepBinary(), false);
+                            if (val0 == null)
+                                val0 = (V)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, val, qry.keepBinary(), false);
+
+                            Cache.Entry<K, V> entry = new CacheEntryImpl(key0, val0);
+
+                            // Reduce.
                             if (!rdc.collect(entry) || !iter.hasNext()) {
                                 onPageReady(loc, qryInfo, Collections.singletonList(rdc.reduce()), true, null);
 
@@ -1308,12 +1281,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                             else
                                 continue;
                         }
-
-                        data.add(trans != null ? trans.apply(entry) :
-                            !loc ? new GridCacheQueryResponseEntry<>(key, val) : F.t(key, val));
+                        else
+                            data.add(!loc ? new GridCacheQueryResponseEntry<>(key, val) : F.t(key, val));
                     }
-                    else
-                        data.add(!loc ? new GridCacheQueryResponseEntry<>(key, val) : F.t(key, val));
 
                     if (!loc) {
                         if (++cnt == pageSize || !iter.hasNext()) {
@@ -1436,7 +1406,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                     taskName));
             }
 
-            GridCloseableIterator it = scanIterator(qry, true);
+            IgniteClosure transformer = qry.transform();
+
+            injectResources(transformer);
+
+            GridCloseableIterator it = scanIterator(qry, transformer, true);
 
             updateStatistics = false;
 
@@ -1517,7 +1491,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
         if (exec) {
             try {
-                fut.onDone(executeQuery(qryInfo.query(), qryInfo.arguments(), false,
+                fut.onDone(executeQuery(qryInfo.query(), qryInfo.arguments(), qryInfo.transformer(), false,
                     qryInfo.query().subjectId(), taskName, recipient(qryInfo.senderId(), qryInfo.requestId())));
             }
             catch (Throwable e) {
@@ -2758,6 +2732,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * Creates user's predicate based scan query.
      *
      * @param filter Scan filter.
+     * @param trans Transformer.
      * @param part Partition.
      * @param keepBinary Keep binary flag.
      * @return Created query.
@@ -2925,12 +2900,19 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         /** */
         private IgniteCacheExpiryPolicy expiryPlc;
 
+        /** */
+        private final boolean locNode;
+
+        /** */
+        private final boolean incBackups;
+
         /**
          * @param it Iterator.
          * @param qry Query.
          * @param topVer Topology version.
          * @param locPart Local partition.
          * @param scanFilter Scan filter.
+         * @param transformer Transformer.
          * @param locNode Local node flag.
          * @param cctx Cache context.
          * @param log Logger.
@@ -2941,6 +2923,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             AffinityTopologyVersion topVer,
             GridDhtLocalPartition locPart,
             IgniteBiPredicate<K, V> scanFilter,
+            IgniteClosure transformer,
             boolean locNode,
             GridCacheContext cctx,
             IgniteLogger log) {
@@ -2950,10 +2933,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             this.scanFilter = scanFilter;
             this.cctx = cctx;
             this.log = log;
+            this.locNode = locNode;
+
+            incBackups = qry.includeBackups();
 
-            statsEnabled = locNode && cctx.statisticsEnabled();
+            statsEnabled = cctx.statisticsEnabled();
 
-            readEvt = locNode && cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+            readEvt = cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ) &&
+                cctx.gridEvents().hasListener(EVT_CACHE_QUERY_OBJECT_READ);
 
             if(readEvt){
                 taskName = cctx.kernalContext().task().resolveTaskName(qry.taskHash());
@@ -2965,8 +2952,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             }
 
             // keep binary for remote scans if possible
-            keepBinary = (!locNode && scanFilter == null) || qry.keepBinary();
-            transform = qry.transform();
+            keepBinary = (!locNode && scanFilter == null && transformer == null && !readEvt) || qry.keepBinary();
+            transform = transformer;
             dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht());
             cache = dht != null ? dht : cctx.cache();
             objCtx = cctx.cacheObjectContext();
@@ -3020,7 +3007,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         private void advance() {
             long start = statsEnabled ? System.nanoTime() : 0L;
 
-            Object next = null;
+            Object next0 = null;
 
             while (it.hasNext()) {
                 CacheDataRow row = it.next();
@@ -3062,6 +3049,31 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                 else
                     val = row.value();
 
+                // Filter backups for SCAN queries, if it isn't partition scan.
+                // Other types are filtered in indexing manager.
+                if (!cctx.isReplicated() && /*qry.partition()*/this.locPart == null &&
+                    cctx.config().getCacheMode() != LOCAL && !incBackups &&
+                    !cctx.affinity().primaryByKey(cctx.localNode(), key, topVer)) {
+                    if (log.isDebugEnabled())
+                        log.debug("Ignoring backup element [row=" + row +
+                            ", cacheMode=" + cctx.config().getCacheMode() + ", incBackups=" + incBackups +
+                            ", primary=" + cctx.affinity().primaryByKey(cctx.localNode(), key, topVer) + ']');
+
+                    continue;
+                }
+
+                if (log.isDebugEnabled()) {
+                    ClusterNode primaryNode = cctx.affinity().primaryByKey(key,
+                        cctx.affinity().affinityTopologyVersion());
+
+                    log.debug(S.toString("Record",
+                        "key", key, true,
+                        "val", val, true,
+                        "incBackups", incBackups, false,
+                        "priNode", primaryNode != null ? U.id8(primaryNode.id()) : null, false,
+                        "node", U.id8(cctx.localNode().id()), false));
+                }
+
                 if (val != null) {
                     K key0 = (K)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, key, keepBinary, false);
                     V val0 = (V)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, val, keepBinary, false);
@@ -3075,7 +3087,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                     }
 
                     if (scanFilter == null || scanFilter.apply(key0, val0)) {
-                        if (readEvt && cctx.gridEvents().hasListener(EVT_CACHE_QUERY_OBJECT_READ)) {
+                        if (readEvt) {
                             cctx.gridEvents().record(new CacheQueryReadEvent<>(
                                 cctx.localNode(),
                                 "Scan query entry read.",
@@ -3095,15 +3107,18 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                                 null));
                         }
 
-                        next = transform == null ? new CacheQueryEntry<>(key0, val0)
-                            : transform.apply(new CacheQueryEntry<>(key0, val0));
+                        if (transform != null)
+                            next0 = transform.apply(new CacheQueryEntry<>(key0, val0));
+                        else
+                            next0 = !locNode ? new GridCacheQueryResponseEntry<>(key0, val0):
+                                new CacheQueryEntry<>(key0, val0);
 
                         break;
                     }
                 }
             }
 
-            if ((this.next = next) == null && expiryPlc != null && dht != null) {
+            if ((this.next = next0) == null && expiryPlc != null && dht != null) {
                 dht.sendTtlUpdateRequest(expiryPlc);
 
                 expiryPlc = null;
index c378b6f..269ae71 100644 (file)
@@ -595,6 +595,85 @@ public class GridCacheQueryTransformerSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testPageSize() throws Exception {
+        IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+        int numEntries = 10_000;
+        int pageSize = 3;
+
+        try {
+            for (int i = 0; i < numEntries; i++)
+                cache.put(i, new Value("str" + i, i));
+
+            IgniteClosure<Cache.Entry<Integer, Value>, Integer> transformer =
+                new IgniteClosure<Cache.Entry<Integer, Value>, Integer>() {
+                    @Override public Integer apply(Cache.Entry<Integer, Value> e) {
+                        return e.getValue().idx;
+                    }
+                };
+
+            ScanQuery<Integer, Value> query = new ScanQuery<>();
+            query.setPageSize(pageSize);
+
+            List<Integer> res = cache.query(query, transformer).getAll();
+
+            assertEquals(numEntries, res.size());
+
+            Collections.sort(res);
+
+            for (int i = 0; i < numEntries; i++)
+                assertEquals(i, res.get(i).intValue());
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocalInjection() throws Exception {
+        IgniteCache<Integer, Value> cache = grid().createCache("test-cache");
+
+        try {
+            for (int i = 0; i < 50; i++)
+                cache.put(i, new Value("str" + i, i * 100));
+
+            Collection<List<Boolean>> lists = grid().compute().broadcast(new IgniteCallable<List<Boolean>>() {
+                @IgniteInstanceResource
+                private Ignite ignite;
+
+                @Override public List<Boolean> call() throws Exception {
+                    IgniteClosure<Cache.Entry<Integer, Value>, Boolean> transformer =
+                        new IgniteClosure<Cache.Entry<Integer, Value>, Boolean>() {
+                            @IgniteInstanceResource
+                            Ignite ignite;
+
+                            @Override public Boolean apply(Cache.Entry<Integer, Value> e) {
+                                return ignite != null;
+                            }
+                        };
+
+                    return ignite.cache("test-cache").query(new ScanQuery<Integer, Value>().setLocal(true),
+                        transformer).getAll();
+                }
+            });
+
+            List<Boolean> res = new ArrayList<>(F.flatCollections(lists));
+
+            assertEquals(50, res.size());
+
+            for (int i = 0; i < 50; i++)
+                assertEquals(Boolean.TRUE, res.get(i));
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /**
      */
     private static class Value {
         /** */