IGNITE-7639 Fixed NPE
authorAlexey Goncharuk <alexey.goncharuk@gmail.com>
Wed, 7 Feb 2018 18:10:32 +0000 (21:10 +0300)
committerAlexey Goncharuk <alexey.goncharuk@gmail.com>
Wed, 7 Feb 2018 18:10:32 +0000 (21:10 +0300)
modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java

index dea2ce7..b022754 100644 (file)
@@ -28,22 +28,36 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Discovery data related to cluster state.
+ * A pojo-object representing current cluster global state. The state includes cluster active flag and cluster
+ * baseline topology.
+ * <p>
+ * This object also captures a transitional cluster state, when one or more fields are changing. In this case,
+ * a {@code transitionReqId} field is set to a non-null value and {@code prevState} captures previous cluster state.
+ * A joining node catching the cluster in an intermediate state will observe {@code transitionReqId} field to be
+ * non-null, however the {@code prevState} will not be sent to the joining node.
+ *
+ * TODO https://issues.apache.org/jira/browse/IGNITE-7640 This class must be immutable, transitionRes must be set by calling finish().
  */
 public class DiscoveryDataClusterState implements Serializable {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** */
+    /** Flag indicating if the cluster in in active state. */
     private final boolean active;
 
-    /** */
+    /** Current cluster baseline topology. */
     @Nullable private final BaselineTopology baselineTopology;
 
-    /** */
+    /**
+     * Transition request ID. Set to a non-null value if the cluster is changing it's state.
+     * The ID is assigned on the initiating node.
+     */
     private final UUID transitionReqId;
 
-    /** Topology version for state change exchange. */
+    /**
+     * Topology version in the cluster when state change request was received by the coordinator.
+     * The exchange fired for the cluster state change will be on version {@code transitionTopVer.nextMinorVersion()}.
+     */
     @GridToStringInclude
     private final AffinityTopologyVersion transitionTopVer;
 
@@ -51,13 +65,18 @@ public class DiscoveryDataClusterState implements Serializable {
     @GridToStringExclude
     private final Set<UUID> transitionNodes;
 
-    /** Local flag for state transition result (global state is updated asynchronously by custom message). */
+    /**
+     * Local flag for state transition active state result (global state is updated asynchronously by custom message),
+     * {@code null} means that state change is not completed yet.
+     */
     private transient volatile Boolean transitionRes;
 
-    /** */
+    /**
+     * Previous cluster state if this state is a transition state and it was not received by a joining node.
+     */
     private transient DiscoveryDataClusterState prevState;
 
-    /** */
+    /** Transition result error. */
     private transient volatile Exception transitionError;
 
     /**
@@ -86,6 +105,7 @@ public class DiscoveryDataClusterState implements Serializable {
         assert transitionReqId != null;
         assert transitionTopVer != null;
         assert !F.isEmpty(transitionNodes) : transitionNodes;
+        assert prevState != null;
 
         return new DiscoveryDataClusterState(
             prevState,
@@ -156,7 +176,7 @@ public class DiscoveryDataClusterState implements Serializable {
      * @return {@code True} if cluster active state change is in progress, {@code false} otherwise.
      */
     public boolean activeStateChanging() {
-        return transition() && active != prevState.active;
+        return transition() && (prevState == null || (prevState.active != active));
     }
 
     /**
@@ -202,6 +222,9 @@ public class DiscoveryDataClusterState implements Serializable {
     }
 
     /**
+     * Creates a non-transitional cluster state. This method effectively cleans all fields identifying the
+     * state as transitional and creates a new state with the state transition result.
+     *
      * @param success Transition success status.
      * @return Cluster state that finished transition.
      */
index 71718c9..2337329 100644 (file)
@@ -37,7 +37,6 @@ import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteClientReconnectAbstractTest;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
@@ -49,7 +48,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -85,9 +83,6 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
     private boolean testSpi;
 
     /** */
-    private boolean testDiscoSpi;
-
-    /** */
     private boolean testReconnectSpi;
 
     /** */
@@ -104,8 +99,6 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
 
             spi.setJoinTimeout(2 * 60_000);
         }
-        else if (testDiscoSpi)
-            cfg.setDiscoverySpi(new TestTcpDiscoverySpi());
 
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
 
@@ -220,14 +213,14 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
         }
 
         for (int i = 0; i < srvs + clients; i++)
-            assertFalse(ignite(i).active());
+            assertFalse(ignite(i).cluster().active());
 
-        ignite(activateFrom).active(false); // Should be no-op.
+        ignite(activateFrom).cluster().active(false); // Should be no-op.
 
-        ignite(activateFrom).active(true);
+        ignite(activateFrom).cluster().active(true);
 
         for (int i = 0; i < srvs + clients; i++)
-            assertTrue(ignite(i).active());
+            assertTrue(ignite(i).cluster().active());
 
         for (int i = 0; i < srvs + clients; i++) {
             for (int c = 0; c < 2; c++)
@@ -308,16 +301,14 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
     private void joinWhileActivate1(final boolean startClient, final boolean withNewCache) throws Exception {
         IgniteInternalFuture<?> activeFut = startNodesAndBlockStatusChange(2, 0, 0, false);
 
-        IgniteInternalFuture<?> startFut = GridTestUtils.runAsync(new Callable<Void>() {
-            @Override public Void call() throws Exception {
-                client = startClient;
+        IgniteInternalFuture<?> startFut = GridTestUtils.runAsync((Callable<Void>)() -> {
+            client = startClient;
 
-                ccfgs = withNewCache ? cacheConfigurations2() : cacheConfigurations1();
+            ccfgs = withNewCache ? cacheConfigurations2() : cacheConfigurations1();
 
-                startGrid(2);
+            startGrid(2);
 
-                return null;
-            }
+            return null;
         });
 
         TestRecordingCommunicationSpi spi1 = TestRecordingCommunicationSpi.spi(ignite(1));
@@ -376,7 +367,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
         int minorVer = 1;
 
         if (initiallyActive && persistenceEnabled()) {
-            ignite(0).active(true);
+            ignite(0).cluster().active(true);
 
             minorVer++;
         }
@@ -396,11 +387,9 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
             blockExchangeSingleMessage(spi, STATE_CHANGE_TOP_VER);
         }
 
-        IgniteInternalFuture<?> stateChangeFut = GridTestUtils.runAsync(new Runnable() {
-            @Override public void run() {
-                ignite(stateChangeFrom).active(!initiallyActive);
-            }
-        });
+        IgniteInternalFuture<?> stateChangeFut = GridTestUtils.runAsync(() ->
+            ignite(stateChangeFrom).cluster().active(!initiallyActive)
+        );
 
         for (TestRecordingCommunicationSpi spi : spis)
             spi.waitForBlocked();
@@ -417,17 +406,15 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
      * @param topVer Exchange topology version.
      */
     private void blockExchangeSingleMessage(TestRecordingCommunicationSpi spi, final AffinityTopologyVersion topVer) {
-        spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
-            @Override public boolean apply(ClusterNode clusterNode, Message msg) {
-                if (msg instanceof GridDhtPartitionsSingleMessage) {
-                    GridDhtPartitionsSingleMessage pMsg = (GridDhtPartitionsSingleMessage)msg;
-
-                    if (pMsg.exchangeId() != null && pMsg.exchangeId().topologyVersion().equals(topVer))
-                        return true;
-                }
+        spi.blockMessages((IgniteBiPredicate<ClusterNode, Message>)(clusterNode, msg) -> {
+            if (msg instanceof GridDhtPartitionsSingleMessage) {
+                GridDhtPartitionsSingleMessage pMsg = (GridDhtPartitionsSingleMessage)msg;
 
-                return false;
+                if (pMsg.exchangeId() != null && pMsg.exchangeId().topologyVersion().equals(topVer))
+                    return true;
             }
+
+            return false;
         });
     }
 
@@ -460,16 +447,14 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
     private void joinWhileDeactivate1(final boolean startClient, final boolean withNewCache) throws Exception {
         IgniteInternalFuture<?> activeFut = startNodesAndBlockStatusChange(2, 0, 0, true);
 
-        IgniteInternalFuture<?> startFut = GridTestUtils.runAsync(new Callable<Void>() {
-            @Override public Void call() throws Exception {
-                client = startClient;
+        IgniteInternalFuture<?> startFut = GridTestUtils.runAsync((Callable<Void>)() -> {
+            client = startClient;
 
-                ccfgs = withNewCache ? cacheConfigurations2() : cacheConfigurations1();
+            ccfgs = withNewCache ? cacheConfigurations2() : cacheConfigurations1();
 
-                startGrid(2);
+            startGrid(2);
 
-                return null;
-            }
+            return null;
         });
 
         TestRecordingCommunicationSpi spi1 = TestRecordingCommunicationSpi.spi(ignite(1));
@@ -481,7 +466,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
 
         checkNoCaches(3);
 
-        ignite(2).active(true);
+        ignite(2).cluster().active(true);
 
         for (int c = 0; c < 2; c++)
             checkCache(ignite(2), CACHE_NAME_PREFIX + c, true);
@@ -529,30 +514,26 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
 
             final CyclicBarrier b = new CyclicBarrier(START_NODES + 1);
 
-            IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Callable<Void>() {
-                @Override public Void call() throws Exception {
-                    b.await();
+            IgniteInternalFuture<Void> fut1 = GridTestUtils.runAsync(() -> {
+                b.await();
 
-                    Thread.sleep(ThreadLocalRandom.current().nextLong(100) + 1);
+                U.sleep(ThreadLocalRandom.current().nextLong(100) + 1);
 
-                    ignite(0).active(true);
+                ignite(0).cluster().active(true);
 
-                    return null;
-                }
+                return null;
             });
 
             final AtomicInteger nodeIdx = new AtomicInteger(3);
 
-            IgniteInternalFuture<?> fut2 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
-                @Override public Void call() throws Exception {
-                    int idx = nodeIdx.getAndIncrement();
+            IgniteInternalFuture<Long> fut2 = GridTestUtils.runMultiThreadedAsync((Callable<Void>)() -> {
+                int idx = nodeIdx.getAndIncrement();
 
-                    b.await();
+                b.await();
 
-                    startGrid(idx);
+                startGrid(idx);
 
-                    return null;
-                }
+                return null;
             }, START_NODES, "start-node");
 
             fut1.get();
@@ -619,19 +600,19 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
         }
 
         if (persistenceEnabled())
-            ignite(deactivateFrom).active(true);
+            ignite(deactivateFrom).cluster().active(true);
 
-        ignite(deactivateFrom).active(true); // Should be no-op.
+        ignite(deactivateFrom).cluster().active(true); // Should be no-op.
 
         checkCaches(srvs + clients, CACHES);
 
         for (int i = 0; i < srvs + clients; i++)
-            assertTrue(ignite(i).active());
+            assertTrue(ignite(i).cluster().active());
 
-        ignite(deactivateFrom).active(false);
+        ignite(deactivateFrom).cluster().active(false);
 
         for (int i = 0; i < srvs + clients; i++)
-            assertFalse(ignite(i).active());
+            assertFalse(ignite(i).cluster().active());
 
         checkNoCaches(srvs + clients);
 
@@ -648,12 +629,12 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
         checkNoCaches(srvs + clients + 2);
 
         for (int i = 0; i < srvs + clients + 2; i++)
-            assertFalse(ignite(i).active());
+            assertFalse(ignite(i).cluster().active());
 
-        ignite(deactivateFrom).active(true);
+        ignite(deactivateFrom).cluster().active(true);
 
         for (int i = 0; i < srvs + clients + 2; i++) {
-            assertTrue(ignite(i).active());
+            assertTrue(ignite(i).cluster().active());
 
             checkCache(ignite(i), CU.UTILITY_CACHE_NAME, true);
         }
@@ -695,7 +676,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
         startWithCaches1(SRVS, CLIENTS);
 
         if (persistenceEnabled())
-            ignite(0).active(true);
+            ignite(0).cluster().active(true);
 
         Ignite srv = ignite(0);
         Ignite client = ignite(SRVS);
@@ -741,7 +722,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
 
         checkNoCaches(SRVS + CLIENTS);
 
-        ignite(0).active(true);
+        ignite(0).cluster().active(true);
 
         checkCache(client, CU.UTILITY_CACHE_NAME, true);
 
@@ -789,39 +770,38 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
         IgniteEx client = grid(SRVS);
 
         if (persistenceEnabled())
-            ignite(0).active(true);
+            ignite(0).cluster().active(true);
 
         checkCache(client, CU.UTILITY_CACHE_NAME, true);
 
         checkCaches1(SRVS + CLIENTS);
 
+        // Wait for late affinity assignment to finish.
+        grid(0).context().cache().context().exchange().affinityReadyFuture(
+            new AffinityTopologyVersion(SRVS + CLIENTS, 1)).get();
+
         final AffinityTopologyVersion STATE_CHANGE_TOP_VER = new AffinityTopologyVersion(SRVS + CLIENTS + 1, 1);
 
         final TestRecordingCommunicationSpi spi1 = transition ? TestRecordingCommunicationSpi.spi(ignite(1)) : null;
 
         final AtomicReference<IgniteInternalFuture> stateFut = new AtomicReference<>();
 
-        IgniteClientReconnectAbstractTest.reconnectClientNode(log, client, srv, new Runnable() {
-            @Override public void run() {
-                if (transition) {
-                    blockExchangeSingleMessage(spi1, STATE_CHANGE_TOP_VER);
-
-                    stateFut.set(GridTestUtils.runAsync(new Runnable() {
-                        @Override public void run() {
-                            srv.active(false);
-                        }
-                    }, "deactivate"));
-
-                    try {
-                        U.sleep(500);
-                    }
-                    catch (Exception e) {
-                        e.printStackTrace();
-                    }
+        IgniteClientReconnectAbstractTest.reconnectClientNode(log, client, srv, () -> {
+            if (transition) {
+                blockExchangeSingleMessage(spi1, STATE_CHANGE_TOP_VER);
+
+                stateFut.set(GridTestUtils.runAsync(() -> srv.cluster().active(false),
+                    "deactivate"));
+
+                try {
+                    U.sleep(500);
+                }
+                catch (Exception e) {
+                    e.printStackTrace();
                 }
-                else
-                    srv.active(false);
             }
+            else
+                srv.cluster().active(false);
         });
 
         if (transition) {
@@ -839,11 +819,11 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
 
         checkNoCaches(SRVS + CLIENTS);
 
-        ignite(0).active(true);
+        ignite(0).cluster().active(true);
 
         checkCache(client, CU.UTILITY_CACHE_NAME, true);
 
-        assertTrue(client.active());
+        assertTrue(client.cluster().active());
 
         checkCaches1(SRVS + CLIENTS);
 
@@ -900,27 +880,22 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
 
         final AtomicReference<IgniteInternalFuture> stateFut = new AtomicReference<>();
 
-        IgniteClientReconnectAbstractTest.reconnectClientNode(log, client, srv, new Runnable() {
-            @Override public void run() {
-                if (transition) {
-                    blockExchangeSingleMessage(spi1, STATE_CHANGE_TOP_VER);
-
-                    stateFut.set(GridTestUtils.runAsync(new Runnable() {
-                        @Override public void run() {
-                            srv.active(true);
-                        }
-                    }, "activate"));
-
-                    try {
-                        U.sleep(500);
-                    }
-                    catch (Exception e) {
-                        e.printStackTrace();
-                    }
+        IgniteClientReconnectAbstractTest.reconnectClientNode(log, client, srv, () -> {
+            if (transition) {
+                blockExchangeSingleMessage(spi1, STATE_CHANGE_TOP_VER);
+
+                stateFut.set(GridTestUtils.runAsync(() -> srv.cluster().active(true),
+                    "activate"));
+
+                try {
+                    U.sleep(500);
+                }
+                catch (Exception e) {
+                    e.printStackTrace();
                 }
-                else
-                    srv.active(true);
             }
+            else
+                srv.cluster().active(true);
         });
 
         if (transition) {
@@ -989,7 +964,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
 
         checkRecordedMessages(false);
 
-        ignite(0).active(true);
+        ignite(0).cluster().active(true);
 
         checkCaches1(SRVS + CLIENTS);
 
@@ -1033,12 +1008,10 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
         client = false;
 
         // Start one more node while transition is in progress.
-        IgniteInternalFuture startFut = GridTestUtils.runAsync(new Callable() {
-            @Override public Object call() throws Exception {
-                startGrid(8);
+        IgniteInternalFuture<Void> startFut = GridTestUtils.runAsync(() -> {
+            startGrid(8);
 
-                return null;
-            }
+            return null;
         }, "start-node");
 
         U.sleep(500);
@@ -1061,7 +1034,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
         if (!activate) {
             checkNoCaches(9);
 
-            ignite(0).active(true);
+            ignite(0).cluster().active(true);
         }
 
         checkCaches1(9);
@@ -1092,19 +1065,16 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
         client = false;
 
         // Start more nodes while transition is in progress.
-        IgniteInternalFuture startFut1 = GridTestUtils.runAsync(new Callable() {
-            @Override public Object call() throws Exception {
-                startGrid(8);
+        IgniteInternalFuture<Void> startFut1 = GridTestUtils.runAsync(() -> {
+            startGrid(8);
 
-                return null;
-            }
+            return null;
         }, "start-node1");
-        IgniteInternalFuture startFut2 = GridTestUtils.runAsync(new Callable() {
-            @Override public Object call() throws Exception {
-                startGrid(9);
 
-                return null;
-            }
+        IgniteInternalFuture<Void> startFut2 = GridTestUtils.runAsync(() -> {
+            startGrid(9);
+
+            return null;
         }, "start-node2");
 
         U.sleep(500);
@@ -1132,7 +1102,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
         if (!activate) {
             checkNoCaches(10);
 
-            ignite(0).active(true);
+            ignite(0).cluster().active(true);
         }
 
         checkCaches1(10);
@@ -1214,7 +1184,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
 
         ((IgniteEx)node).context().state().publicApiActiveState(true);
 
-        GridCacheAdapter cache = ((IgniteKernal)node).context().cache().internalCache(cacheName);
+        GridCacheAdapter cache = ((IgniteEx)node).context().cache().internalCache(cacheName);
 
         if (exp)
             assertNotNull("Cache not found [cache=" + cacheName + ", node=" + node.name() + ']', cache);
@@ -1229,7 +1199,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest
         for (int i = 0; i < nodes; i++) {
             grid(i).context().state().publicApiActiveState(true);
 
-            GridCacheProcessor cache = ((IgniteKernal)ignite(i)).context().cache();
+            GridCacheProcessor cache = ((IgniteEx)ignite(i)).context().cache();
 
             assertTrue(cache.caches().isEmpty());
             assertTrue(cache.internalCaches().isEmpty());