IGNITE-10883 Fix and refactoring IgniteRebalanceOnCachesStoppingOrDestroyingTest...
authorDmitriyGovorukhin <dmitriy.govorukhin@gmail.com>
Tue, 25 Dec 2018 11:01:50 +0000 (14:01 +0300)
committerDmitriy Govorukhin <dmitriy.govorukhin@gmail.com>
Thu, 10 Jan 2019 21:14:09 +0000 (00:14 +0300)
Signed-off-by: Dmitriy Govorukhin <dmitriy.govorukhin@gmail.com>
modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java

index 556af19..6c72258 100644 (file)
@@ -2486,7 +2486,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      *
      * @param cacheMap Map to add to.
      * @param cacheName Cache name.
-     * @param rich Node to add
+     * @param node Node to add
      */
     private void addToMap(Map<Integer, List<ClusterNode>> cacheMap, String cacheName, ClusterNode node) {
         List<ClusterNode> cacheNodes = cacheMap.get(CU.cacheId(cacheName));
index 937f1f0..83c548e 100644 (file)
@@ -507,9 +507,11 @@ public class WalStateManager extends GridCacheSharedManagerAdapter {
                     for (Integer grpId0 : session0.disabledGrps) {
                         CacheGroupContext grp = cctx.cache().cacheGroup(grpId0);
 
-                        assert grp != null;
+                        if (grp != null)
+                            grp.topology().ownMoving(topVer);
+                        else if (log.isDebugEnabled())
+                            log.debug("Cache group was destroyed before checkpoint finished, [grpId=" + grpId0 + ']');
 
-                        grp.topology().ownMoving(topVer);
                     }
 
                     cctx.exchange().refreshPartitions();
index 5c7f6c0..0ef2289 100644 (file)
@@ -19,16 +19,16 @@ package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
 
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -37,19 +37,15 @@ import org.apache.ignite.configuration.TransactionConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.failure.StopNodeFailureHandler;
 import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
-import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.MvccFeatureChecker;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
@@ -82,6 +78,9 @@ public class IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonA
     /** */
     private static final int REBALANCE_BATCH_SIZE = 50 * 1024;
 
+    /** Number of loaded keys in each cache. */
+    private static final int KEYS_SIZE = 3000;
+
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
@@ -112,12 +111,12 @@ public class IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonA
             .setDefaultTxTimeout(1000));
 
         cfg.setDataStorageConfiguration(
-                new DataStorageConfiguration()
-                        .setWalMode(WALMode.LOG_ONLY)
-                        .setDefaultDataRegionConfiguration(
-                                new DataRegionConfiguration()
-                                        .setPersistenceEnabled(true)
-                                        .setMaxSize(100L * 1024 * 1024)));
+            new DataStorageConfiguration()
+                .setWalMode(WALMode.LOG_ONLY)
+                .setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration()
+                        .setPersistenceEnabled(true)
+                        .setMaxSize(100L * 1024 * 1024)));
 
         return cfg;
     }
@@ -126,7 +125,23 @@ public class IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonA
      *
      */
     @Test
-    public void testStopCachesOnDeactivation() throws Exception {
+    public void testStopCachesOnDeactivationFirstGroup() throws Exception {
+        testStopCachesOnDeactivation(GROUP_1);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testStopCachesOnDeactivationSecondGroup() throws Exception {
+        testStopCachesOnDeactivation(GROUP_2);
+    }
+
+    /**
+     * @param groupName Group name.
+     * @throws Exception If failed.
+     */
+    private void testStopCachesOnDeactivation(String groupName) throws Exception {
         if (MvccFeatureChecker.forcedMvcc())
             fail("https://issues.apache.org/jira/browse/IGNITE-10582");
 
@@ -137,26 +152,58 @@ public class IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonA
             ig.cluster().active(true);
 
             return null;
-        });
+        }, groupName);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testDestroySpecificCachesInDifferentCacheGroupsFirstGroup() throws Exception {
+        testDestroySpecificCachesInDifferentCacheGroups(GROUP_1);
     }
 
     /**
      *
      */
     @Test
-    public void testDestroySpecificCachesInDifferentCacheGroups() throws Exception {
+    public void testDestroySpecificCachesInDifferentCacheGroupsSecondGroup() throws Exception {
+        testDestroySpecificCachesInDifferentCacheGroups(GROUP_2);
+    }
+
+    /**
+     * @param groupName Group name.
+     * @throws Exception If failed.
+     */
+    private void testDestroySpecificCachesInDifferentCacheGroups(String groupName) throws Exception {
         performTest(ig -> {
             ig.destroyCaches(Arrays.asList(CACHE_1, CACHE_3));
 
             return null;
-        });
+        }, groupName);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testDestroySpecificCacheAndCacheGroupFirstGroup() throws Exception {
+        testDestroySpecificCacheAndCacheGroup(GROUP_1);
     }
 
     /**
      *
      */
     @Test
-    public void testDestroySpecificCacheAndCacheGroup() throws Exception {
+    public void testDestroySpecificCacheAndCacheGroupSecondGroup() throws Exception {
+        testDestroySpecificCacheAndCacheGroup(GROUP_2);
+    }
+
+    /**
+     * @param groupName Group name.
+     * @throws Exception If failed.
+     */
+    private void testDestroySpecificCacheAndCacheGroup(String groupName) throws Exception {
         if (MvccFeatureChecker.forcedMvcc())
             fail("https://issues.apache.org/jira/browse/IGNITE-10582");
 
@@ -164,13 +211,13 @@ public class IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonA
             ig.destroyCaches(Arrays.asList(CACHE_1, CACHE_3, CACHE_4));
 
             return null;
-        });
+        }, groupName);
     }
 
     /**
      * @param testAction Action that trigger stop or destroy of caches.
      */
-    private void performTest(IgniteThrowableConsumer<Ignite, Void> testAction) throws Exception {
+    private void performTest(IgniteThrowableConsumer<Ignite, Void> testAction, String groupName) throws Exception {
         IgniteEx ig0 = (IgniteEx)startGrids(2);
 
         ig0.cluster().active(true);
@@ -179,13 +226,27 @@ public class IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonA
 
         loadData(ig0);
 
-        startGrid(1);
+        IgniteEx ig1 = startGrid(1);
+
+        RebalanceBlockingSPI commSpi = (RebalanceBlockingSPI)ig1.configuration().getCommunicationSpi();
 
-        runLoad(ig0);
+        // Complete all futures for groups that we don't need to wait.
+        commSpi.resumeRebalanceFutures.forEach((k, v) -> {
+            if (k != CU.cacheId(groupName))
+                v.onDone();
+        });
+
+        CountDownLatch latch = commSpi.suspendRebalanceInMiddleLatch.get(CU.cacheId(groupName));
+
+        assert latch != null;
+
+        // Await some middle point rebalance for group.
+        latch.await();
 
         testAction.accept(ig0);
 
-        U.sleep(1000);
+        // Resume rebalance after action performed.
+        commSpi.resumeRebalanceFutures.get(CU.cacheId(groupName)).onDone();
 
         awaitPartitionMapExchange(true, true, null, true);
 
@@ -197,22 +258,22 @@ public class IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonA
      */
     private void loadData(Ignite ig) {
         List<CacheConfiguration> configs = Stream.of(
-                F.t(CACHE_1, GROUP_1),
-                F.t(CACHE_2, GROUP_1),
-                F.t(CACHE_3, GROUP_2),
-                F.t(CACHE_4, GROUP_2)
+            F.t(CACHE_1, GROUP_1),
+            F.t(CACHE_2, GROUP_1),
+            F.t(CACHE_3, GROUP_2),
+            F.t(CACHE_4, GROUP_2)
         ).map(names -> new CacheConfiguration<>(names.get1())
-                .setGroupName(names.get2())
-                .setRebalanceBatchSize(REBALANCE_BATCH_SIZE)
-                .setCacheMode(CacheMode.REPLICATED)
-                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+            .setGroupName(names.get2())
+            .setRebalanceBatchSize(REBALANCE_BATCH_SIZE)
+            .setCacheMode(CacheMode.REPLICATED)
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
         ).collect(Collectors.toList());
 
         ig.getOrCreateCaches(configs);
 
         configs.forEach(cfg -> {
             try (IgniteDataStreamer<Object, Object> streamer = ig.dataStreamer(cfg.getName())) {
-                for (int i = 0; i < 3_000; i++)
+                for (int i = 0; i < KEYS_SIZE; i++)
                     streamer.addData(i, new byte[1024]);
 
                 streamer.flush();
@@ -221,69 +282,43 @@ public class IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonA
     }
 
     /**
-     * @param ig Ignite instance.
-     */
-    private void runLoad(Ignite ig) throws Exception{
-        GridTestUtils.runMultiThreaded(new Runnable() {
-            @Override public void run() {
-                String cacheName = F.rand(CACHE_1, CACHE_2, CACHE_3, CACHE_4);
-
-                IgniteCache cache = ig.cache(cacheName);
-
-                for (int i = 0; i < 3_000; i++) {
-                    int idx = ThreadLocalRandom.current().nextInt(3_000);
-
-                    while (true) {
-                        try {
-                            cache.put(idx, new byte[1024]);
-
-                            break;
-                        }
-                        catch (Exception e) {
-                            MvccFeatureChecker.assertMvccWriteConflict(e);
-                        }
-                    }
-                }
-            }
-        }, 4, "load-thread");
-    }
-
-    /**
      *
      */
     private static class RebalanceBlockingSPI extends TcpCommunicationSpi {
-        /** {@inheritDoc} */
-        @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
-            slowDownMessage(msg);
-
-            super.sendMessage(node, msg);
-
+        /** */
+        private final Map<Integer, GridFutureAdapter> resumeRebalanceFutures = new ConcurrentHashMap<>();
+
+        /** */
+        private final Map<Integer, CountDownLatch> suspendRebalanceInMiddleLatch = new ConcurrentHashMap<>();
+
+        /** */
+        RebalanceBlockingSPI() {
+            resumeRebalanceFutures.put(CU.cacheId(GROUP_1), new GridFutureAdapter());
+            resumeRebalanceFutures.put(CU.cacheId(GROUP_2), new GridFutureAdapter());
+            suspendRebalanceInMiddleLatch.put(CU.cacheId(GROUP_1), new CountDownLatch(3));
+            suspendRebalanceInMiddleLatch.put(CU.cacheId(GROUP_2), new CountDownLatch(3));
         }
 
         /** {@inheritDoc} */
-        @Override public void sendMessage(ClusterNode node, Message msg,
-                                          IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
-            slowDownMessage(msg);
+        @Override protected void notifyListener(UUID sndId, Message msg, IgniteRunnable msgC) {
+            if (msg instanceof GridIoMessage &&
+                ((GridIoMessage)msg).message() instanceof GridDhtPartitionSupplyMessage) {
+                GridDhtPartitionSupplyMessage msg0 = (GridDhtPartitionSupplyMessage)((GridIoMessage)msg).message();
 
-            super.sendMessage(node, msg, ackC);
-        }
+                CountDownLatch latch = suspendRebalanceInMiddleLatch.get(msg0.groupId());
 
-        /**
-         * @param msg Message.
-         */
-        private void slowDownMessage(Message msg) {
-            if (msg instanceof GridIoMessage && ((GridIoMessage)msg).message() instanceof GridDhtPartitionSupplyMessage) {
-                int grpId = ((GridCacheGroupIdMessage)((GridIoMessage)msg).message()).groupId();
+                if (latch != null) {
+                    if (latch.getCount() > 0)
+                        latch.countDown();
+                    else {
+                        resumeRebalanceFutures.get(msg0.groupId()).listen(f -> super.notifyListener(sndId, msg, msgC));
 
-                if (grpId == CU.cacheId(GROUP_1) || grpId == CU.cacheId(GROUP_2)) {
-                    try {
-                        U.sleep(50);
-                    }
-                    catch (IgniteInterruptedCheckedException e) {
-                        e.printStackTrace();
+                        return;
                     }
                 }
             }
+
+            super.notifyListener(sndId, msg, msgC);
         }
     }
 }