IGNITE-7346 Enable Ignite cache events per cache
authorAleksey Plekhanov <plehanov.alex@gmail.com>
Thu, 8 Feb 2018 16:51:35 +0000 (19:51 +0300)
committerAnton Vinogradov <av@apache.org>
Thu, 8 Feb 2018 16:51:35 +0000 (19:51 +0300)
Signed-off-by: Anton Vinogradov <av@apache.org>
23 files changed:
modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheEventAbstractTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedUnloadEventsSelfTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedEventDisabledSelfTest.java [new file with mode: 0644]
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQueryEvtsDisabledSelfTest.java [new file with mode: 0644]
modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedQueryEvtsDisabledSelfTest.java [new file with mode: 0644]
modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedQuerySelfTest.java
modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/CacheConfigurationParityTest.cs

index 3a40824..d41e687 100644 (file)
@@ -181,6 +181,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     /** Default query parallelism. */
     public static final int DFLT_QUERY_PARALLELISM = 1;
 
+    /** Default value for events disabled flag. */
+    public static final boolean DFLT_EVENTS_DISABLED = false;
+
     /** Cache name. */
     private String name;
 
@@ -361,6 +364,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     /** Cache key configuration. */
     private CacheKeyConfiguration[] keyCfg;
 
+    /** Events disabled. */
+    private boolean evtsDisabled = DFLT_EVENTS_DISABLED;
+
     /** Empty constructor (all values are initialized to their defaults). */
     public CacheConfiguration() {
         /* No-op. */
@@ -453,6 +459,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
         storeConcurrentLoadAllThreshold = cc.getStoreConcurrentLoadAllThreshold();
         maxQryIterCnt = cc.getMaxQueryIteratorsCount();
         sqlOnheapCache = cc.isSqlOnheapCacheEnabled();
+        evtsDisabled = cc.isEventsDisabled();
     }
 
     /**
@@ -2184,6 +2191,27 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     }
 
     /**
+     * Checks whether events are disabled for this cache.
+     *
+     * @return Events disabled flag.
+     */
+    public Boolean isEventsDisabled() {
+        return evtsDisabled;
+    }
+
+    /**
+     * Sets events disabled flag.
+     *
+     * @param evtsDisabled Events disabled flag.
+     * @return {@code this} for chaining.
+     */
+    public CacheConfiguration<K, V> setEventsDisabled(boolean evtsDisabled) {
+        this.evtsDisabled = evtsDisabled;
+
+        return this;
+    }
+
+    /**
      * Gets cache key configuration.
      *
      * @return Cache key configuration.
index b980d5c..fd9ea1f 100644 (file)
@@ -407,7 +407,7 @@ public class CacheGroupContext {
         for (int i = 0; i < caches.size(); i++) {
             GridCacheContext cctx = caches.get(i);
 
-            if (cctx.recordEvent(type)) {
+            if (!cctx.config().isEventsDisabled() && cctx.recordEvent(type)) {
                 cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(),
                     cctx.localNode(),
                     "Cache rebalancing event.",
@@ -434,14 +434,15 @@ public class CacheGroupContext {
         for (int i = 0; i < caches.size(); i++) {
             GridCacheContext cctx = caches.get(i);
 
-            cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(),
-                cctx.localNode(),
-                "Cache unloading event.",
-                EVT_CACHE_REBALANCE_PART_UNLOADED,
-                part,
-                null,
-                0,
-                0));
+            if (!cctx.config().isEventsDisabled())
+                cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(),
+                    cctx.localNode(),
+                    "Cache unloading event.",
+                    EVT_CACHE_REBALANCE_PART_UNLOADED,
+                    part,
+                    null,
+                    0,
+                    0));
         }
     }
 
@@ -472,20 +473,21 @@ public class CacheGroupContext {
         for (int i = 0; i < caches.size(); i++) {
             GridCacheContext cctx = caches.get(i);
 
-            cctx.events().addEvent(part,
-                key,
-                evtNodeId,
-                (IgniteUuid)null,
-                null,
-                type,
-                newVal,
-                hasNewVal,
-                oldVal,
-                hasOldVal,
-                null,
-                null,
-                null,
-                keepBinary);
+            if (!cctx.config().isEventsDisabled())
+                cctx.events().addEvent(part,
+                    key,
+                    evtNodeId,
+                    (IgniteUuid)null,
+                    null,
+                    type,
+                    newVal,
+                    hasNewVal,
+                    oldVal,
+                    hasOldVal,
+                    null,
+                    null,
+                    null,
+                    keepBinary);
         }
     }
 
index a9692f8..3c5cf1e 100644 (file)
@@ -372,7 +372,8 @@ public class GridCacheEventManager extends GridCacheManagerAdapter {
     public boolean isRecordable(int type) {
         GridCacheContext cctx0 = cctx;
 
-        return cctx0 != null && cctx0.userCache() && cctx0.gridEvents().isRecordable(type);
+        return cctx0 != null && cctx0.userCache() && cctx0.gridEvents().isRecordable(type)
+            && !cctx0.config().isEventsDisabled();
     }
 
     /** {@inheritDoc} */
index e63aab6..7a982ce 100644 (file)
@@ -903,7 +903,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
                             if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) {
                                 removeEntry(cached);
 
-                                if (rec) {
+                                if (rec && !hld.cctx.config().isEventsDisabled()) {
                                     hld.cctx.events().addEvent(cached.partition(),
                                         cached.key(),
                                         ctx.localNodeId(),
index fe8e054..ea19c2f 100644 (file)
@@ -68,7 +68,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.GridCacheInternal;
 import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
@@ -77,7 +76,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheA
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
 import org.apache.ignite.internal.processors.datastructures.GridSetQueryPredicate;
 import org.apache.ignite.internal.processors.datastructures.SetItemKey;
@@ -577,7 +575,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                     throw new IllegalStateException("Should never be called.");
 
                 case SCAN:
-                    if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
+                    if (cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
                         cctx.gridEvents().record(new CacheQueryExecutedEvent<>(
                             cctx.localNode(),
                             "Scan query executed.",
@@ -598,7 +596,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                     break;
 
                 case TEXT:
-                    if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
+                    if (cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
                         cctx.gridEvents().record(new CacheQueryExecutedEvent<>(
                             cctx.localNode(),
                             "Full text query executed.",
@@ -672,7 +670,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         }
 
         if (qry.type() == SQL_FIELDS) {
-            if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
+            if (cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
                 cctx.gridEvents().record(new CacheQueryExecutedEvent<>(
                     cctx.localNode(),
                     "SQL fields query executed.",
@@ -704,7 +702,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         else {
             assert qry.type() == SPI : "Unexpected query type: " + qry.type();
 
-            if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
+            if (cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
                 cctx.gridEvents().record(new CacheQueryExecutedEvent<>(
                     cctx.localNode(),
                     "SPI query executed.",
@@ -912,7 +910,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
             final boolean statsEnabled = cctx.statisticsEnabled();
 
-            final boolean readEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+            final boolean readEvt = cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
 
             try {
                 // Preparing query closures.
@@ -1154,7 +1152,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
                 final boolean statsEnabled = cctx.statisticsEnabled();
 
-                final boolean readEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+                final boolean readEvt = cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
 
                 CacheObjectContext objCtx = cctx.cacheObjectContext();
 
@@ -1424,7 +1422,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             final ClusterNode locNode = cctx.localNode();
             final UUID subjId = qry.subjectId();
 
-            if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
+            if (cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
                 cctx.gridEvents().record(new CacheQueryExecutedEvent<>(
                     locNode,
                     "Scan query executed.",
@@ -2957,7 +2955,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
             statsEnabled = locNode && cctx.statisticsEnabled();
 
-            readEvt = locNode && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+            readEvt = locNode && cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
 
             if(readEvt){
                 taskName = cctx.kernalContext().task().resolveTaskName(qry.taskHash());
index f0cd7ca..9ff4623 100644 (file)
@@ -365,7 +365,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
         CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() {
             @Override public void onExecution() {
-                if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
+                GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+                if (cctx != null && cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
                     ctx.event().record(new CacheQueryExecutedEvent<>(
                         ctx.discovery().localNode(),
                         "Continuous query executed.",
index 1e131ef..f40c077 100644 (file)
@@ -331,7 +331,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
 
         boolean initialized = false;
 
-        boolean recordIgniteEvt = primary && !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+        boolean recordIgniteEvt = primary && !internal && cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
 
         for (CacheContinuousQueryListener lsnr : lsnrCol.values()) {
             if (preload && !lsnr.notifyExisting())
@@ -398,7 +398,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         boolean primary = cctx.affinity().primaryByPartition(cctx.localNode(), e.partition(), AffinityTopologyVersion.NONE);
 
         if (cctx.isReplicated() || primary) {
-            boolean recordIgniteEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+            boolean recordIgniteEvt = cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
 
             boolean initialized = false;
 
index cbb6b9b..f666cdd 100644 (file)
@@ -2037,7 +2037,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                         idx.querySqlFields(schemaName, qry, keepBinary, failOnMultipleStmts, cancel);
 
                     if (cctx != null)
-                        sendQueryExecutedEvent(qry.getSql(), qry.getArgs(), cctx.name());
+                        sendQueryExecutedEvent(qry.getSql(), qry.getArgs(), cctx);
 
                     return res;
                 }
@@ -2179,7 +2179,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                         sendQueryExecutedEvent(
                             qry.getSql(),
                             qry.getArgs(),
-                            cctx.name());
+                            cctx);
 
                         if (cctx.config().getQueryParallelism() > 1) {
                             qry.setDistributedJoins(true);
@@ -2343,14 +2343,14 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @param sqlQry Sql query.
      * @param params Params.
      */
-    private void sendQueryExecutedEvent(String sqlQry, Object[] params, String cacheName) {
-        if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
+    private void sendQueryExecutedEvent(String sqlQry, Object[] params, GridCacheContext<?, ?> cctx) {
+        if (cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
             ctx.event().record(new CacheQueryExecutedEvent<>(
                 ctx.discovery().localNode(),
                 "SQL query executed.",
                 EVT_CACHE_QUERY_EXECUTED,
                 CacheQueryType.SQL.name(),
-                cacheName,
+                cctx.name(),
                 null,
                 sqlQry,
                 null,
index 8f601f8..50c0579 100644 (file)
@@ -744,6 +744,12 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
         cfg.setName(DYNAMIC_CACHE_NAME);
         cfg.setCacheMode(CacheMode.REPLICATED);
 
+        // This cache will not fire any cache events.
+        CacheConfiguration cfgEvtsDisabled = new CacheConfiguration(cfg);
+
+        cfgEvtsDisabled.setName("DynamicCacheEvtsDisabled");
+        cfgEvtsDisabled.setEventsDisabled(true);
+
         final CountDownLatch[] starts = new CountDownLatch[nodeCount()];
         final CountDownLatch[] stops = new CountDownLatch[nodeCount()];
 
@@ -781,6 +787,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
             ignite(i).events().localListen(lsnrs[i], EventType.EVTS_CACHE_LIFECYCLE);
         }
 
+        IgniteCache<Object, Object> cacheEvtsDisabled = ignite(0).createCache(cfgEvtsDisabled);
         IgniteCache<Object, Object> cache = ignite(0).createCache(cfg);
 
         try {
@@ -788,6 +795,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
                 start.await();
         }
         finally {
+            cacheEvtsDisabled.destroy();
             cache.destroy();
         }
 
index 2202339..55190ad 100644 (file)
@@ -65,6 +65,9 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe
     /** */
     private static volatile int gridCnt;
 
+    /** Event listener. */
+    protected static volatile EventListener evtLsnr;
+
     /**
      * @return {@code True} if partitioned.
      */
@@ -78,8 +81,10 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe
 
         gridCnt = gridCount();
 
+        evtLsnr = createEventListener();
+
         for (int i = 0; i < gridCnt; i++)
-            grid(i).events().localListen(new TestEventListener(partitioned()), EVTS_CACHE);
+            grid(i).events().localListen(evtLsnr, EVTS_CACHE);
     }
 
     /** {@inheritDoc} */
@@ -89,7 +94,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe
         if (TEST_INFO)
             info("Called beforeTest() callback.");
 
-        TestEventListener.reset();
+        evtLsnr.reset();
     }
 
     /** {@inheritDoc} */
@@ -97,17 +102,24 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe
         if (TEST_INFO)
             info("Called afterTest() callback.");
 
-        TestEventListener.stopListen();
+        evtLsnr.stopListen();
 
         try {
             super.afterTest();
         }
         finally {
-            TestEventListener.listen();
+            evtLsnr.listen();
         }
     }
 
     /**
+     * Create event listener.
+     */
+    protected EventListener createEventListener() {
+        return new TestEventListener(partitioned());
+    }
+
+    /**
      * Waits for event count on all nodes.
      *
      * @param gridIdx Grid index.
@@ -117,7 +129,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe
     private void waitForEvents(int gridIdx, IgniteBiTuple<Integer, Integer>... evtCnts) throws Exception {
         if (!F.isEmpty(evtCnts))
             try {
-                TestEventListener.waitForEventCount(evtCnts);
+                evtLsnr.waitForEventCount(evtCnts);
             }
             catch (IgniteCheckedException e) {
                 printEventCounters(gridIdx, evtCnts);
@@ -136,7 +148,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe
         for (IgniteBiTuple<Integer, Integer> t : expCnts) {
             Integer evtType = t.get1();
 
-            int actCnt = TestEventListener.eventCount(evtType);
+            int actCnt = evtLsnr.eventCount(evtType);
 
             info("Event [evtType=" + evtType + ", expCnt=" + t.get2() + ", actCnt=" + actCnt + ']');
         }
@@ -178,13 +190,13 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe
             }
             finally {
                 // This call is mainly required to correctly clear event futures.
-                TestEventListener.reset();
+                evtLsnr.reset();
 
                 clearCaches();
 
                 // This call is required for the second time to reset counters for
                 // the previous call.
-                TestEventListener.reset();
+                evtLsnr.reset();
             }
         }
     }
@@ -649,20 +661,52 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe
     }
 
     /**
+     * Event listener interface.
+     */
+    protected static interface EventListener extends IgnitePredicate<Event> {
+        /**
+         * Start listen.
+         */
+        void listen();
+
+        /**
+         * Stop listen.
+         */
+        void stopListen();
+
+        /**
+         * Gets events count by type.
+         *
+         * @param type Type.
+         */
+        int eventCount(int type);
+
+        /**
+         * Reset event counters.
+         */
+        void reset();
+
+        /**
+         * @param evtCnts Event counters.
+         */
+        void waitForEventCount(IgniteBiTuple<Integer, Integer>... evtCnts) throws IgniteCheckedException;
+    }
+
+    /**
      * Local event listener.
      */
-    private static class TestEventListener implements IgnitePredicate<Event> {
+    private static class TestEventListener implements EventListener {
         /** Events count map. */
-        private static ConcurrentMap<Integer, AtomicInteger> cntrs = new ConcurrentHashMap<>();
+        private ConcurrentMap<Integer, AtomicInteger> cntrs = new ConcurrentHashMap<>();
 
         /** Event futures. */
-        private static Collection<EventTypeFuture> futs = new GridConcurrentHashSet<>();
+        private Collection<EventTypeFuture> futs = new GridConcurrentHashSet<>();
 
         /** */
-        private static volatile boolean listen = true;
+        private volatile boolean listen = true;
 
         /** */
-        private static boolean partitioned;
+        private boolean partitioned;
 
         /**
          * @param p Partitioned flag.
@@ -674,14 +718,14 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe
         /**
          *
          */
-        private static void listen() {
+        public void listen() {
             listen = true;
         }
 
         /**
          *
          */
-        private static void stopListen() {
+        public void stopListen() {
             listen = false;
         }
 
@@ -689,7 +733,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe
          * @param type Event type.
          * @return Count.
          */
-        static int eventCount(int type) {
+        public int eventCount(int type) {
             assert type > 0;
 
             AtomicInteger cntr = cntrs.get(type);
@@ -700,7 +744,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe
         /**
          * Reset listener.
          */
-        static void reset() {
+        public void reset() {
             cntrs.clear();
 
             futs.clear();
@@ -734,7 +778,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe
          * @param evtCnts Array of tuples with values: V1 - event type, V2 - expected event count.
          * @throws IgniteCheckedException If failed to wait.
          */
-        private static void waitForEventCount(IgniteBiTuple<Integer, Integer>... evtCnts)
+        public void waitForEventCount(IgniteBiTuple<Integer, Integer>... evtCnts)
             throws IgniteCheckedException {
             if (F.isEmpty(evtCnts))
                 return;
@@ -837,4 +881,4 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe
             return S.toString(EventTypeFuture.class, this, "evtName", U.gridEventName(evtType));
         }
     }
-}
\ No newline at end of file
+}
index 8dc2835..d6cea58 100644 (file)
@@ -50,6 +50,9 @@ public class GridCachePartitionedUnloadEventsSelfTest extends GridCommonAbstract
     /** */
     private static final int EVENTS_COUNT = 40;
 
+    /** Default cache name with cache events disabled. */
+    private static final String DEFAULT_CACHE_NAME_EVTS_DISABLED = DEFAULT_CACHE_NAME + "EvtsDisabled";
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -58,7 +61,14 @@ public class GridCachePartitionedUnloadEventsSelfTest extends GridCommonAbstract
         disco.setIpFinder(ipFinder);
         cfg.setDiscoverySpi(disco);
 
-        cfg.setCacheConfiguration(cacheConfiguration());
+        CacheConfiguration<?, ?> ccfg = cacheConfiguration();
+
+        CacheConfiguration<?, ?> ccfgEvtsDisabled = new CacheConfiguration<>(ccfg);
+
+        ccfgEvtsDisabled.setName(DEFAULT_CACHE_NAME_EVTS_DISABLED);
+        ccfgEvtsDisabled.setEventsDisabled(true);
+
+        cfg.setCacheConfiguration(ccfg, ccfgEvtsDisabled);
 
         return cfg;
     }
@@ -83,16 +93,21 @@ public class GridCachePartitionedUnloadEventsSelfTest extends GridCommonAbstract
 
         Collection<Integer> allKeys = new ArrayList<>(EVENTS_COUNT);
 
-        IgniteCache<Integer, String> cache = g1.cache(DEFAULT_CACHE_NAME);
+        IgniteCache<Integer, String> cache1 = g1.cache(DEFAULT_CACHE_NAME);
+        IgniteCache<Integer, String> cache2 = g1.cache(DEFAULT_CACHE_NAME_EVTS_DISABLED);
 
         for (int i = 0; i < EVENTS_COUNT; i++) {
-            cache.put(i, "val");
+            cache1.put(i, "val");
+
+            // Events should not be fired by this put.
+            cache2.put(i, "val");
+
             allKeys.add(i);
         }
 
         Ignite g2 = startGrid("g2");
 
-        awaitPartitionMapExchange();
+        awaitPartitionMapExchange(true, true, null);
 
         Map<ClusterNode, Collection<Object>> keysMap = g1.affinity(DEFAULT_CACHE_NAME).mapKeysToNodes(allKeys);
         Collection<Object> g2Keys = keysMap.get(g2.cluster().localNode());
@@ -156,4 +171,4 @@ public class GridCachePartitionedUnloadEventsSelfTest extends GridCommonAbstract
             assertEquals(g.cluster().localNode().id(), unloadEvt.node().id());
         }
     }
-}
\ No newline at end of file
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedEventDisabledSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedEventDisabledSelfTest.java
new file mode 100644 (file)
index 0000000..84cbbd1
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.replicated;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+/**
+ * Tests events.
+ */
+public class GridCacheReplicatedEventDisabledSelfTest extends GridCacheReplicatedEventSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfiguration(String igniteInstanceName) throws Exception {
+        return super.cacheConfiguration(igniteInstanceName).setEventsDisabled(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected EventListener createEventListener() {
+        return new TestEventListener();
+    }
+
+    /**
+     * Test event listener.
+     */
+    private static class TestEventListener implements EventListener {
+        /** {@inheritDoc} */
+        @Override public void listen() {
+            /* No-op. */
+        }
+
+        /** {@inheritDoc} */
+        @Override public void stopListen() {
+            /* No-op. */
+        }
+
+        /** {@inheritDoc} */
+        @Override public int eventCount(int type) {
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void reset() {
+            /* No-op. */
+        }
+
+        /** {@inheritDoc} */
+        @Override public void waitForEventCount(IgniteBiTuple<Integer, Integer>... evtCnts) throws IgniteCheckedException {
+            /* No-op. */
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(Event evt) {
+            fail("Cache events are disabled");
+
+            return false;
+        }
+    }
+}
index e90b7e1..f114091 100644 (file)
@@ -242,6 +242,13 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
 
             GridCacheAdapter<Integer, String> cache1 = ((IgniteKernal)g1).internalCache(DEFAULT_CACHE_NAME);
 
+            // Cache rebalancing events should not be fired for this cache.
+            CacheConfiguration ccfg = cacheConfiguration(((IgniteKernal)g1).getInstanceName())
+                .setName(DEFAULT_CACHE_NAME + "_evts_disabled")
+                .setEventsDisabled(true);
+
+            g1.getOrCreateCache(ccfg);
+
             cache1.getAndPut(1, "val1");
             cache1.getAndPut(2, "val2");
 
index 674b6a2..503f39a 100644 (file)
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCa
 import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedBasicApiTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedBasicOpSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedBasicStoreSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedEventDisabledSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedEventSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedEvictionEventSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedGetAndTransformStoreSelfTest;
@@ -109,6 +110,7 @@ public class IgniteCacheTestSuite3 extends TestSuite {
         suite.addTestSuite(GridCacheReplicatedGetAndTransformStoreSelfTest.class);
         suite.addTestSuite(GridCacheReplicatedAtomicGetAndTransformStoreSelfTest.class);
         suite.addTestSuite(GridCacheReplicatedEventSelfTest.class);
+        suite.addTestSuite(GridCacheReplicatedEventDisabledSelfTest.class);
         suite.addTestSuite(GridCacheReplicatedSynchronousCommitTest.class);
 
         suite.addTestSuite(GridCacheReplicatedLockSelfTest.class);
index 52b641e..5fa90c1 100644 (file)
@@ -620,8 +620,7 @@ public class GridMapQueryExecutor {
                 }
             }
 
-            qr = new MapQueryResults(h2, reqId, qrys.size(), mainCctx != null ? mainCctx.name() : null,
-                MapQueryLazyWorker.currentWorker());
+            qr = new MapQueryResults(h2, reqId, qrys.size(), mainCctx, MapQueryLazyWorker.currentWorker());
 
             if (nodeRess.put(reqId, segmentId, qr) != null)
                 throw new IllegalStateException();
@@ -660,7 +659,7 @@ public class GridMapQueryExecutor {
                 // Run queries.
                 int qryIdx = 0;
 
-                boolean evt = mainCctx != null && ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED);
+                boolean evt = mainCctx != null && mainCctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED);
 
                 for (GridCacheSqlQuery qry : qrys) {
                     ResultSet rs = null;
@@ -819,7 +818,7 @@ public class GridMapQueryExecutor {
             GridCacheContext<?, ?> mainCctx =
                 !F.isEmpty(cacheIds) ? ctx.cache().context().cacheContext(cacheIds.get(0)) : null;
 
-            boolean evt = local && mainCctx != null && ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED);
+            boolean evt = local && mainCctx != null && mainCctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED);
 
             if (evt) {
                 ctx.event().record(new CacheQueryExecutedEvent<>(
@@ -1073,4 +1072,4 @@ public class GridMapQueryExecutor {
     public int registeredLazyWorkers() {
         return lazyWorkers.size();
     }
-}
\ No newline at end of file
+}
index e54c784..beeb054 100644 (file)
 
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
+import java.lang.reflect.Field;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
 import org.apache.ignite.events.CacheQueryReadEvent;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
@@ -31,12 +37,6 @@ import org.h2.result.ResultInterface;
 import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
 
-import java.lang.reflect.Field;
-import java.sql.ResultSet;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
 
 /**
@@ -70,7 +70,7 @@ class MapQueryResult {
     private final ResultSet rs;
 
     /** */
-    private final String cacheName;
+    private final GridCacheContext<?, ?> cctx;
 
     /** */
     private final GridCacheSqlQuery qry;
@@ -101,16 +101,16 @@ class MapQueryResult {
 
     /**
      * @param rs Result set.
-     * @param cacheName Cache name.
+     * @param cctx Cache context.
      * @param qrySrcNodeId Query source node.
      * @param qry Query.
      * @param params Query params.
      * @param lazyWorker Lazy worker.
      */
-    MapQueryResult(IgniteH2Indexing h2, ResultSet rs, @Nullable String cacheName,
+    MapQueryResult(IgniteH2Indexing h2, ResultSet rs, @Nullable GridCacheContext cctx,
         UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params, @Nullable MapQueryLazyWorker lazyWorker) {
         this.h2 = h2;
-        this.cacheName = cacheName;
+        this.cctx = cctx;
         this.qry = qry;
         this.params = params;
         this.qrySrcNodeId = qrySrcNodeId;
@@ -179,7 +179,7 @@ class MapQueryResult {
         if (closed)
             return true;
 
-        boolean readEvt = cacheName != null && h2.kernalContext().event().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+        boolean readEvt = cctx != null && cctx.name() != null && cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
 
         page++;
 
@@ -222,7 +222,7 @@ class MapQueryResult {
                     "SQL fields query result set row read.",
                     EVT_CACHE_QUERY_OBJECT_READ,
                     CacheQueryType.SQL.name(),
-                    cacheName,
+                    cctx.name(),
                     null,
                     qry.query(),
                     null,
index 99f1966..45f9c1f 100644 (file)
 
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
+import java.sql.ResultSet;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.jetbrains.annotations.Nullable;
 
-import java.sql.ResultSet;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReferenceArray;
-
 /**
  * Mapper query results.
  */
@@ -43,7 +43,7 @@ class MapQueryResults {
     private final GridQueryCancel[] cancels;
 
     /** */
-    private final String cacheName;
+    private final GridCacheContext<?, ?> cctx;
 
     /** Lazy worker. */
     private final MapQueryLazyWorker lazyWorker;
@@ -56,15 +56,15 @@ class MapQueryResults {
      *
      * @param qryReqId Query request ID.
      * @param qrys Number of queries.
-     * @param cacheName Cache name.
+     * @param cctx Cache context.
      * @param lazyWorker Lazy worker (if any).
      */
     @SuppressWarnings("unchecked")
-    MapQueryResults(IgniteH2Indexing h2, long qryReqId, int qrys, @Nullable String cacheName,
+    MapQueryResults(IgniteH2Indexing h2, long qryReqId, int qrys, @Nullable GridCacheContext<?, ?> cctx,
         @Nullable MapQueryLazyWorker lazyWorker) {
         this.h2 = h2;
         this.qryReqId = qryReqId;
-        this.cacheName = cacheName;
+        this.cctx = cctx;
         this.lazyWorker = lazyWorker;
 
         results = new AtomicReferenceArray<>(qrys);
@@ -108,7 +108,7 @@ class MapQueryResults {
      * @param rs Result set.
      */
     void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs, Object[] params) {
-        MapQueryResult res = new MapQueryResult(h2, rs, cacheName, qrySrcNodeId, q, params, lazyWorker);
+        MapQueryResult res = new MapQueryResult(h2, rs, cctx, qrySrcNodeId, q, params, lazyWorker);
 
         if (lazyWorker != null)
             lazyWorker.result(res);
index 97ef8e5..f2fef29 100644 (file)
@@ -1466,8 +1466,10 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
      * @throws Exception If failed.
      */
     private void checkSqlQueryEvents() throws Exception {
-        final CountDownLatch execLatch = new CountDownLatch(cacheMode() == REPLICATED ? 1 : gridCount());
         final IgniteCache<Integer, Integer> cache = jcache(Integer.class, Integer.class);
+        final boolean evtsDisabled = cache.getConfiguration(CacheConfiguration.class).isEventsDisabled();
+        final CountDownLatch execLatch = new CountDownLatch(evtsDisabled ? 0 :
+            cacheMode() == REPLICATED ? 1 : gridCount());
 
         IgnitePredicate[] lsnrs = new IgnitePredicate[gridCount()];
 
@@ -1476,6 +1478,9 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
                 @Override public boolean apply(Event evt) {
                     assert evt instanceof CacheQueryExecutedEvent;
 
+                    if (evtsDisabled)
+                        fail("Cache events are disabled");
+
                     CacheQueryExecutedEvent qe = (CacheQueryExecutedEvent)evt;
 
                     assertEquals(cache.getName(), qe.cacheName());
@@ -1517,9 +1522,11 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
      */
     public void testScanQueryEvents() throws Exception {
         final Map<Integer, Integer> map = new ConcurrentHashMap8<>();
-        final CountDownLatch latch = new CountDownLatch(10);
-        final CountDownLatch execLatch = new CountDownLatch(cacheMode() == REPLICATED ? 1 : gridCount());
         final IgniteCache<Integer, Integer> cache = jcache(Integer.class, Integer.class);
+        final boolean evtsDisabled = cache.getConfiguration(CacheConfiguration.class).isEventsDisabled();
+        final CountDownLatch latch = new CountDownLatch(evtsDisabled ? 0 : 10);
+        final CountDownLatch execLatch = new CountDownLatch(evtsDisabled ? 0 :
+            cacheMode() == REPLICATED ? 1 : gridCount());
 
         IgnitePredicate[] objReadLsnrs = new IgnitePredicate[gridCount()];
         IgnitePredicate[] qryExecLsnrs = new IgnitePredicate[gridCount()];
@@ -1529,6 +1536,9 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
                 @Override public boolean apply(Event evt) {
                     assert evt instanceof CacheQueryReadEvent;
 
+                    if (evtsDisabled)
+                        fail("Cache events are disabled");
+
                     CacheQueryReadEvent<Integer, Integer> qe = (CacheQueryReadEvent<Integer, Integer>)evt;
 
                     assertEquals(SCAN.name(), qe.queryType());
@@ -1555,6 +1565,9 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
                 @Override public boolean apply(Event evt) {
                     assert evt instanceof CacheQueryExecutedEvent;
 
+                    if (evtsDisabled)
+                        fail("Cache events are disabled");
+
                     CacheQueryExecutedEvent qe = (CacheQueryExecutedEvent)evt;
 
                     assertEquals(SCAN.name(), qe.queryType());
@@ -1593,10 +1606,12 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
             assert latch.await(1000, MILLISECONDS);
             assert execLatch.await(1000, MILLISECONDS);
 
-            assertEquals(10, map.size());
+            if (!evtsDisabled) {
+                assertEquals(10, map.size());
 
-            for (int i = 10; i < 20; i++)
-                assertEquals(i, map.get(i).intValue());
+                for (int i = 10; i < 20; i++)
+                    assertEquals(i, map.get(i).intValue());
+            }
         }
         finally {
             for (int i = 0; i < gridCount(); i++) {
@@ -1611,9 +1626,11 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
      */
     public void testTextQueryEvents() throws Exception {
         final Map<UUID, Person> map = new ConcurrentHashMap8<>();
-        final CountDownLatch latch = new CountDownLatch(2);
-        final CountDownLatch execLatch = new CountDownLatch(cacheMode() == REPLICATED ? 1 : gridCount());
         final IgniteCache<UUID, Person> cache = jcache(UUID.class, Person.class);
+        final boolean evtsDisabled = cache.getConfiguration(CacheConfiguration.class).isEventsDisabled();
+        final CountDownLatch latch = new CountDownLatch(evtsDisabled ? 0 : 2);
+        final CountDownLatch execLatch = new CountDownLatch(evtsDisabled ? 0 :
+            cacheMode() == REPLICATED ? 1 : gridCount());
 
         IgnitePredicate[] objReadLsnrs = new IgnitePredicate[gridCount()];
         IgnitePredicate[] qryExecLsnrs = new IgnitePredicate[gridCount()];
@@ -1623,6 +1640,9 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
                 @Override public boolean apply(Event evt) {
                     assert evt instanceof CacheQueryReadEvent;
 
+                    if (evtsDisabled)
+                        fail("Cache events are disabled");
+
                     CacheQueryReadEvent<UUID, Person> qe = (CacheQueryReadEvent<UUID, Person>)evt;
 
                     assertEquals(FULL_TEXT.name(), qe.queryType());
@@ -1649,6 +1669,9 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
                 @Override public boolean apply(Event evt) {
                     assert evt instanceof CacheQueryExecutedEvent;
 
+                    if (evtsDisabled)
+                        fail("Cache events are disabled");
+
                     CacheQueryExecutedEvent qe = (CacheQueryExecutedEvent)evt;
 
                     assertEquals(FULL_TEXT.name(), qe.queryType());
@@ -1686,10 +1709,12 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
             assert latch.await(1000, MILLISECONDS);
             assert execLatch.await(1000, MILLISECONDS);
 
-            assertEquals(2, map.size());
+            if (!evtsDisabled) {
+                assertEquals(2, map.size());
 
-            assertEquals("Bob White", map.get(k1).name());
-            assertEquals("Tom White", map.get(k2).name());
+                assertEquals("Bob White", map.get(k1).name());
+                assertEquals("Tom White", map.get(k2).name());
+            }
         }
         finally {
             for (int i = 0; i < gridCount(); i++) {
@@ -1703,8 +1728,10 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
      * @throws Exception If failed.
      */
     public void testFieldsQueryEvents() throws Exception {
-        final CountDownLatch execLatch = new CountDownLatch(cacheMode() == REPLICATED ? 1 : gridCount());
         final IgniteCache<UUID, Person> cache = jcache(UUID.class, Person.class);
+        final boolean evtsDisabled = cache.getConfiguration(CacheConfiguration.class).isEventsDisabled();
+        final CountDownLatch execLatch = new CountDownLatch(evtsDisabled ? 0 :
+            cacheMode() == REPLICATED ? 1 : gridCount());
 
         IgnitePredicate[] qryExecLsnrs = new IgnitePredicate[gridCount()];
 
@@ -1713,6 +1740,9 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
                 @Override public boolean apply(Event evt) {
                     assert evt instanceof CacheQueryExecutedEvent;
 
+                    if (evtsDisabled)
+                        fail("Cache events are disabled");
+
                     CacheQueryExecutedEvent qe = (CacheQueryExecutedEvent)evt;
 
                     assertEquals(cache.getName(), qe.cacheName());
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQueryEvtsDisabledSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCachePartitionedQueryEvtsDisabledSelfTest.java
new file mode 100644 (file)
index 0000000..3ca2492
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.configuration.CacheConfiguration;
+
+/**
+ * Tests for partitioned cache queries with events disabled.
+ */
+public class IgniteCachePartitionedQueryEvtsDisabledSelfTest extends IgniteCachePartitionedQuerySelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfiguration() {
+        return super.cacheConfiguration().setEventsDisabled(true);
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedQueryEvtsDisabledSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedQueryEvtsDisabledSelfTest.java
new file mode 100644 (file)
index 0000000..09b7f0d
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.replicated;
+
+import org.apache.ignite.configuration.CacheConfiguration;
+
+/**
+ * Tests replicated query with disabled events.
+ */
+public class IgniteCacheReplicatedQueryEvtsDisabledSelfTest extends IgniteCacheReplicatedQuerySelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfiguration() {
+        return super.cacheConfiguration().setEventsDisabled(true);
+    }
+}
index ec1a16d..13942c2 100644 (file)
@@ -38,9 +38,9 @@ import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
-import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractQuerySelfTest;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -241,20 +241,7 @@ public class IgniteCacheReplicatedQuerySelfTest extends IgniteCacheAbstractQuery
      * @throws Exception If test failed.
      */
     public void testDistributedQuery() throws Exception {
-        int keyCnt = 4;
-
-        final CountDownLatch latch = new CountDownLatch(keyCnt * 2);
-
-        IgnitePredicate<Event> lsnr = new IgnitePredicate<Event>() {
-            @Override public boolean apply(Event evt) {
-                latch.countDown();
-
-                return true;
-            }
-        };
-
-        ignite2.events().localListen(lsnr, EventType.EVT_CACHE_OBJECT_PUT);
-        ignite3.events().localListen(lsnr, EventType.EVT_CACHE_OBJECT_PUT);
+        final int keyCnt = 4;
 
         Transaction tx = ignite1.transactions().txStart();
 
@@ -272,7 +259,11 @@ public class IgniteCacheReplicatedQuerySelfTest extends IgniteCacheAbstractQuery
             throw e;
         }
 
-        latch.await();
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return cache2.size() == keyCnt && cache3.size() == keyCnt;
+            }
+        }, 5000);
 
         QueryCursor<Cache.Entry<CacheKey, CacheValue>> qry =
             cache1.query(new SqlQuery<CacheKey, CacheValue>(CacheValue.class, "val > 1 and val < 4"));
@@ -586,4 +577,4 @@ public class IgniteCacheReplicatedQuerySelfTest extends IgniteCacheAbstractQuery
             return S.toString(CacheValue.class, this);
         }
     }
-}
\ No newline at end of file
+}
index 6295d8d..f004453 100644 (file)
@@ -87,6 +87,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheD
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheDistributedQueryCancelSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedFieldsQueryP2PEnabledSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedFieldsQuerySelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedQueryEvtsDisabledSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedQueryP2PDisabledSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedSnapshotEnabledQuerySelfTest;
@@ -96,6 +97,7 @@ import org.apache.ignite.internal.processors.cache.distributed.replicated.Ignite
 import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQueryP2PEnabledSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQueryROSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQuerySelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedQueryEvtsDisabledSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedQueryP2PDisabledSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.index.DuplicateKeyValueClassesSelfTest;
@@ -104,6 +106,7 @@ import org.apache.ignite.internal.processors.cache.index.DynamicIndexServerBasic
 import org.apache.ignite.internal.processors.cache.index.DynamicIndexServerCoordinatorBasicSelfTest;
 import org.apache.ignite.internal.processors.cache.index.DynamicIndexServerNodeFIlterBasicSelfTest;
 import org.apache.ignite.internal.processors.cache.index.DynamicIndexServerNodeFilterCoordinatorBasicSelfTest;
+import org.apache.ignite.internal.processors.cache.index.H2ConnectionLeaksSelfTest;
 import org.apache.ignite.internal.processors.cache.index.H2DynamicColumnsClientBasicSelfTest;
 import org.apache.ignite.internal.processors.cache.index.H2DynamicColumnsServerBasicSelfTest;
 import org.apache.ignite.internal.processors.cache.index.H2DynamicColumnsServerCoordinatorBasicSelfTest;
@@ -122,7 +125,6 @@ import org.apache.ignite.internal.processors.cache.index.H2DynamicIndexingComple
 import org.apache.ignite.internal.processors.cache.index.H2DynamicIndexingComplexServerTransactionalPartitionedTest;
 import org.apache.ignite.internal.processors.cache.index.H2DynamicIndexingComplexServerTransactionalReplicatedTest;
 import org.apache.ignite.internal.processors.cache.index.H2DynamicTableSelfTest;
-import org.apache.ignite.internal.processors.cache.index.H2ConnectionLeaksSelfTest;
 import org.apache.ignite.internal.processors.cache.index.H2RowCachePageEvictionTest;
 import org.apache.ignite.internal.processors.cache.index.H2RowCacheSelfTest;
 import org.apache.ignite.internal.processors.cache.index.LongIndexNameTest;
@@ -137,28 +139,28 @@ import org.apache.ignite.internal.processors.cache.query.IgniteCacheQueryCacheDe
 import org.apache.ignite.internal.processors.cache.query.IndexingSpiQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.query.IndexingSpiQueryTxSelfTest;
 import org.apache.ignite.internal.processors.client.ClientConnectorConfigurationValidationSelfTest;
-import org.apache.ignite.internal.processors.query.IgniteCachelessQueriesSelfTest;
 import org.apache.ignite.internal.processors.database.baseline.IgniteStableBaselineBinObjFieldsQuerySelfTest;
+import org.apache.ignite.internal.processors.query.IgniteCachelessQueriesSelfTest;
+import org.apache.ignite.internal.processors.query.IgniteQueryDedicatedPoolTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlDefaultValueTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlDistributedJoinSelfTest;
-import org.apache.ignite.internal.processors.query.IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest;
-import org.apache.ignite.internal.processors.query.IgniteSqlParameterizedQueryTest;
-import org.apache.ignite.internal.processors.query.h2.IgniteSqlBigIntegerKeyTest;
-import org.apache.ignite.internal.processors.query.IgniteQueryDedicatedPoolTest;
-import org.apache.ignite.internal.processors.query.IgniteSqlSkipReducerOnUpdateDmlSelfTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlEntryCacheModeAgnosticTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlKeyValueFieldsTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlNotNullConstraintTest;
+import org.apache.ignite.internal.processors.query.IgniteSqlParameterizedQueryTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlRoutingTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlSchemaIndexingTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlSegmentedIndexMultiNodeSelfTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlSegmentedIndexSelfTest;
+import org.apache.ignite.internal.processors.query.IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest;
+import org.apache.ignite.internal.processors.query.IgniteSqlSkipReducerOnUpdateDmlSelfTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlSplitterSelfTest;
 import org.apache.ignite.internal.processors.query.LazyQuerySelfTest;
 import org.apache.ignite.internal.processors.query.MultipleStatementsSqlQuerySelfTest;
 import org.apache.ignite.internal.processors.query.SqlSchemaSelfTest;
 import org.apache.ignite.internal.processors.query.h2.GridH2IndexingInMemSelfTest;
 import org.apache.ignite.internal.processors.query.h2.GridH2IndexingOffheapSelfTest;
+import org.apache.ignite.internal.processors.query.h2.IgniteSqlBigIntegerKeyTest;
 import org.apache.ignite.internal.processors.query.h2.IgniteSqlQueryMinMaxTest;
 import org.apache.ignite.internal.processors.query.h2.sql.BaseH2CompareQueryTest;
 import org.apache.ignite.internal.processors.query.h2.sql.GridQueryParsingTest;
@@ -236,11 +238,13 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCacheLocalAtomicQuerySelfTest.class);
         suite.addTestSuite(IgniteCacheReplicatedQuerySelfTest.class);
         suite.addTestSuite(IgniteCacheReplicatedQueryP2PDisabledSelfTest.class);
+        suite.addTestSuite(IgniteCacheReplicatedQueryEvtsDisabledSelfTest.class);
         suite.addTestSuite(IgniteCachePartitionedQuerySelfTest.class);
         suite.addTestSuite(IgniteCachePartitionedSnapshotEnabledQuerySelfTest.class);
         suite.addTestSuite(IgniteCacheAtomicQuerySelfTest.class);
         suite.addTestSuite(IgniteCacheAtomicNearEnabledQuerySelfTest.class);
         suite.addTestSuite(IgniteCachePartitionedQueryP2PDisabledSelfTest.class);
+        suite.addTestSuite(IgniteCachePartitionedQueryEvtsDisabledSelfTest.class);
 
         suite.addTestSuite(IgniteCacheQueryIndexSelfTest.class);
         suite.addTestSuite(IgniteCacheCollocatedQuerySelfTest.class);
index 0022e96..94f52ce 100644 (file)
@@ -58,7 +58,8 @@ namespace Apache.Ignite.Core.Tests.ApiParity
         {
             "NodeFilter",  // IGNITE-2890
             "EvictionPolicyFactory",  // IGNITE-6649,
-            "isSqlOnheapCacheEnabled"  // IGNITE-7379
+            "isSqlOnheapCacheEnabled",  // IGNITE-7379
+            "isEventsDisabled"  // IGNITE-7346
         };
 
         /// <summary>