IGNITE-10815 Fix coordinator failover in case of exchanges merge and non-affinity...
authorPavel Kovalenko <jokserfn@gmail.com>
Thu, 27 Dec 2018 08:32:12 +0000 (11:32 +0300)
committerPavel Kovalenko <jokserfn@gmail.com>
Thu, 27 Dec 2018 08:32:12 +0000 (11:32 +0300)
Signed-off-by: Pavel Kovalenko <jokserfn@gmail.com>
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java

index 61d88c7..2c4a640 100644 (file)
@@ -2003,33 +2003,53 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                         assert idx >= 0 && idx < exchFuts.size() - 1 : "Invalid exchange futures state [cur=" + idx +
                             ", total=" + exchFuts.size() + ']';
 
-                        final GridDhtPartitionsExchangeFuture prev = exchFuts.get(idx + 1);
+                        GridDhtPartitionsExchangeFuture futureToFetchAffinity = null;
 
-                        assert prev.isDone() && prev.topologyVersion().compareTo(topVer) < 0 : prev;
+                        for (int i = idx + 1; i < exchFuts.size(); i++) {
+                            GridDhtPartitionsExchangeFuture prev = exchFuts.get(i);
+
+                            assert prev.isDone() && prev.topologyVersion().compareTo(topVer) < 0;
+
+                            if (prev.isMerged())
+                                continue;
+
+                            futureToFetchAffinity = prev;
+
+                            break;
+                        }
+
+                        if (futureToFetchAffinity == null)
+                            throw new IgniteCheckedException("Failed to find completed exchange future to fetch affinity.");
 
                         if (log.isDebugEnabled()) {
                             log.debug("Need initialize affinity on coordinator [" +
                                 "cacheGrp=" + desc.cacheOrGroupName() +
-                                "prevAff=" + prev.topologyVersion() + ']');
+                                "prevAff=" + futureToFetchAffinity.topologyVersion() + ']');
                         }
 
-                        GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
-                            desc.groupId(),
-                            prev.topologyVersion(),
-                            prev.events().discoveryCache());
+                        GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(
+                                cctx,
+                                desc.groupId(),
+                                futureToFetchAffinity.topologyVersion(),
+                                futureToFetchAffinity.events().discoveryCache()
+                        );
 
                         fetchFut.init(false);
 
                         final GridFutureAdapter<AffinityTopologyVersion> affFut = new GridFutureAdapter<>();
 
+                        final GridDhtPartitionsExchangeFuture futureToFetchAffinity0 = futureToFetchAffinity;
+
                         fetchFut.listen(new IgniteInClosureX<IgniteInternalFuture<GridDhtAffinityAssignmentResponse>>() {
                             @Override public void applyx(IgniteInternalFuture<GridDhtAffinityAssignmentResponse> fetchFut)
-                                throws IgniteCheckedException {
-                                fetchAffinity(prev.topologyVersion(),
-                                    prev.events(),
-                                    prev.events().discoveryCache(),
-                                    aff,
-                                    (GridDhtAssignmentFetchFuture)fetchFut);
+                                    throws IgniteCheckedException {
+                                fetchAffinity(
+                                        futureToFetchAffinity0.topologyVersion(),
+                                        futureToFetchAffinity0.events(),
+                                        futureToFetchAffinity0.events().discoveryCache(),
+                                        aff,
+                                        (GridDhtAssignmentFetchFuture)fetchFut
+                                );
 
                                 aff.calculate(topVer, fut.events(), fut.events().discoveryCache());
 
index 01c10aa..de1054b 100644 (file)
@@ -2616,7 +2616,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         GridDhtPartitionsExchangeFuture fut0 = (GridDhtPartitionsExchangeFuture)task;
 
                         if (resVer.compareTo(fut0.initialVersion()) >= 0) {
-                            fut0.finishMerged();
+                            fut0.finishMerged(resVer);
 
                             futQ.remove(fut0);
                         }
index d4d89bc..b53fe99 100644 (file)
@@ -51,6 +51,8 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteDiagnosticAware;
 import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
@@ -2027,8 +2029,23 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /**
      * Finish merged future to allow GridCachePartitionExchangeManager.ExchangeFutureSet cleanup.
      */
-    public void finishMerged() {
-        super.onDone(null, null);
+    public void finishMerged(AffinityTopologyVersion resVer) {
+        synchronized (mux) {
+            if (state == null) state = ExchangeLocalState.MERGED;
+        }
+
+        done.set(true);
+
+        super.onDone(resVer, null);
+    }
+
+    /**
+     * @return {@code True} if future was merged.
+     */
+    public boolean isMerged() {
+        synchronized (mux) {
+            return state == ExchangeLocalState.MERGED;
+        }
     }
 
     /**
@@ -4420,7 +4437,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                                 cctx.kernalContext().closure().callLocal(new Callable<Void>() {
                                     @Override public Void call() throws Exception {
-                                        newCrdFut.init(GridDhtPartitionsExchangeFuture.this);
+                                        try {
+                                            newCrdFut.init(GridDhtPartitionsExchangeFuture.this);
+                                        }
+                                        catch (Throwable t) {
+                                            U.error(log, "Failed to initialize new coordinator future [topVer=" + initialVersion() + "]", t);
+
+                                            cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, t));
+
+                                            throw t;
+                                        }
 
                                         newCrdFut.listen(new CI1<IgniteInternalFuture>() {
                                             @Override public void apply(IgniteInternalFuture fut) {
index a001bd4..dc42b29 100644 (file)
 package org.apache.ignite.internal.processors.cache;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.function.Function;
+import java.util.function.Supplier;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Assert;
@@ -44,23 +56,38 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstractTest {
+    /** */
+    private static final String CACHE_NAME = "cache";
+
+    /** */
+    private Supplier<CommunicationSpi> spiFactory = TcpCommunicationSpi::new;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
         cfg.setConsistentId(igniteInstanceName);
 
-        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
-
-        IgnitePredicate<ClusterNode> nodeFilter = node -> node.consistentId().equals(igniteInstanceName);
+        cfg.setCommunicationSpi(spiFactory.get());
 
         cfg.setCacheConfiguration(
-            new CacheConfiguration("cache-" + igniteInstanceName)
-                .setBackups(1)
-                .setNodeFilter(nodeFilter)
-                .setAffinity(new RendezvousAffinityFunction(false, 32))
+                new CacheConfiguration(CACHE_NAME)
+                    .setBackups(2)
+                    .setAffinity(new RendezvousAffinityFunction(false, 32))
         );
 
+        // Add cache that exists only on coordinator node.
+        if (igniteInstanceName.equals("crd")) {
+            IgnitePredicate<ClusterNode> nodeFilter = node -> node.consistentId().equals(igniteInstanceName);
+
+            cfg.setCacheConfiguration(
+                    new CacheConfiguration(CACHE_NAME + 0)
+                            .setBackups(2)
+                            .setNodeFilter(nodeFilter)
+                            .setAffinity(new RendezvousAffinityFunction(false, 32))
+            );
+        }
+
         return cfg;
     }
 
@@ -81,6 +108,8 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
      */
     @Test
     public void testNewCoordinatorCompletedExchange() throws Exception {
+        spiFactory = TestRecordingCommunicationSpi::new;
+
         IgniteEx crd = (IgniteEx) startGrid("crd");
 
         IgniteEx newCrd = startGrid(1);
@@ -150,7 +179,7 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
 
         // Check that all caches are operable.
         for (Ignite grid : G.allGrids()) {
-            IgniteCache cache = grid.cache("cache-" + grid.cluster().localNode().consistentId());
+            IgniteCache cache = grid.cache(CACHE_NAME);
 
             Assert.assertNotNull(cache);
 
@@ -165,6 +194,8 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
      */
     @Test
     public void testDelayedFullMessageReplacedIfCoordinatorChanged() throws Exception {
+        spiFactory = TestRecordingCommunicationSpi::new;
+
         IgniteEx crd = startGrid("crd");
 
         IgniteEx newCrd = startGrid(1);
@@ -199,6 +230,94 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
     }
 
     /**
+     * Test that exchange coordinator initialized correctly in case of exchanges merge and caches without affinity nodes.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCoordinatorChangeAfterExchangesMerge() throws Exception {
+        // Delay demand messages sending to suspend late affinity assignment.
+        spiFactory = () -> new DynamicDelayingCommunicationSpi(msg -> {
+            final int delay = 5_000;
+
+            if (msg instanceof GridDhtPartitionDemandMessage) {
+                GridDhtPartitionDemandMessage demandMessage = (GridDhtPartitionDemandMessage) msg;
+
+                if (demandMessage.groupId() == GridCacheUtils.cacheId(GridCacheUtils.UTILITY_CACHE_NAME))
+                    return 0;
+
+                return delay;
+            }
+
+            return 0;
+        });
+
+        final IgniteEx crd = startGrid("crd");
+
+        startGrid(1);
+
+        for (int k = 0; k < 1024; k++)
+            crd.cache(CACHE_NAME).put(k, k);
+
+        // Delay sending single messages to ensure exchanges are merged.
+        spiFactory = () -> new DynamicDelayingCommunicationSpi(msg -> {
+            final int delay = 1_000;
+
+            if (msg instanceof GridDhtPartitionsSingleMessage) {
+                GridDhtPartitionsSingleMessage singleMsg = (GridDhtPartitionsSingleMessage) msg;
+
+                if (singleMsg.exchangeId() != null)
+                    return delay;
+            }
+
+            return 0;
+        });
+
+        // This should trigger exchanges merge.
+        startGridsMultiThreaded(2, 2);
+
+        // Delay sending single message from new node to have time to shutdown coordinator.
+        spiFactory = () -> new DynamicDelayingCommunicationSpi(msg -> {
+            final int delay = 5_000;
+
+            if (msg instanceof GridDhtPartitionsSingleMessage) {
+                GridDhtPartitionsSingleMessage singleMsg = (GridDhtPartitionsSingleMessage) msg;
+
+                if (singleMsg.exchangeId() != null)
+                    return delay;
+            }
+
+            return 0;
+        });
+
+        // Trigger next exchange.
+        IgniteInternalFuture startNodeFut = GridTestUtils.runAsync(() -> startGrid(4));
+
+        // Wait till other nodes will send their messages to coordinator.
+        U.sleep(2_500);
+
+        // And then stop coordinator node.
+        stopGrid("crd", true);
+
+        startNodeFut.get();
+
+        awaitPartitionMapExchange();
+
+        // Check that all caches are operable.
+        for (Ignite grid : G.allGrids()) {
+            IgniteCache cache = grid.cache(CACHE_NAME);
+
+            Assert.assertNotNull(cache);
+
+            for (int k = 0; k < 1024; k++)
+                Assert.assertEquals(k, cache.get(k));
+
+            for (int k = 0; k < 1024; k++)
+                cache.put(k, k);
+        }
+    }
+
+    /**
      * Blocks sending full message from coordinator to non-coordinator node.
      * @param from Coordinator node.
      * @param to Non-coordinator node.
@@ -222,4 +341,45 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
             return false;
         });
     }
+
+    /**
+     * Communication SPI that allows to delay sending message by predicate.
+     */
+    class DynamicDelayingCommunicationSpi extends TcpCommunicationSpi {
+        /** Function that returns delay in milliseconds for given message. */
+        private final Function<Message, Integer> delayMessageFunc;
+
+        /** */
+        DynamicDelayingCommunicationSpi() {
+            this(msg -> 0);
+        }
+
+        /**
+         * @param delayMessageFunc Function to calculate delay for message.
+         */
+        DynamicDelayingCommunicationSpi(final Function<Message, Integer> delayMessageFunc) {
+            this.delayMessageFunc = delayMessageFunc;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
+                throws IgniteSpiException {
+            try {
+                GridIoMessage ioMsg = (GridIoMessage)msg;
+
+                int delay = delayMessageFunc.apply(ioMsg.message());
+
+                if (delay > 0) {
+                    log.warning(String.format("Delay sending %s to %s", msg, node));
+
+                    U.sleep(delay);
+                }
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                throw new IgniteSpiException(e);
+            }
+
+            super.sendMessage(node, msg, ackC);
+        }
+    }
 }
index 942bcd9..8305e66 100644 (file)
@@ -72,6 +72,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -1487,11 +1488,12 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
         for (int i = futs.size() - 1; i >= 0; i--) {
             GridDhtPartitionsExchangeFuture fut = futs.get(i);
 
-            if (fut.exchangeDone() && fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT) {
+            if (!fut.isMerged() && fut.exchangeDone() && fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT) {
                 AffinityTopologyVersion resVer = fut.topologyVersion();
 
-                if (resVer != null)
-                    doneVers.add(resVer);
+                Assert.assertNotNull(resVer);
+
+                doneVers.add(resVer);
             }
         }