IGNITE-10926 ZookeeperDiscoverySpi: client does not survive after several cluster...
authorNSAmelchev <nsamelchev@gmail.com>
Wed, 13 Feb 2019 09:47:32 +0000 (12:47 +0300)
committerIvan Rakov <irakov@apache.org>
Wed, 13 Feb 2019 09:47:32 +0000 (12:47 +0300)
Signed-off-by: Ivan Rakov <irakov@apache.org>
modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkRuntimeState.java
modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryClientDisconnectTest.java
modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoverySpiTestBase.java

index 8e13c8b..0aac6db 100644 (file)
@@ -36,7 +36,7 @@ class ZkRuntimeState {
     volatile Exception errForClose;
 
     /** */
-    final boolean prevJoined;
+    final boolean reconnect;
 
     /** */
     ZookeeperClient zkClient;
@@ -89,10 +89,10 @@ class ZkRuntimeState {
     boolean updateAlives;
 
     /**
-     * @param prevJoined {@code True} if joined topology before reconnect attempt.
+     * @param reconnect {@code True} if joined topology before reconnect attempt.
      */
-    ZkRuntimeState(boolean prevJoined) {
-        this.prevJoined = prevJoined;
+    ZkRuntimeState(boolean reconnect) {
+        this.reconnect = reconnect;
     }
 
     /**
index 6e139ca..b21e335 100644 (file)
@@ -745,17 +745,15 @@ public class ZookeeperDiscoveryImpl {
             return;
 
         try {
-            boolean reconnect = prevState != null;
-
             // Need fire EVT_CLIENT_NODE_RECONNECTED event if reconnect after already joined.
-            boolean prevJoined = prevState != null && prevState.joined;
+            boolean reconnect = locNode.isClient() && prevState != null && (prevState.joined || prevState.reconnect);
 
             IgniteDiscoverySpiInternalListener internalLsnr = this.internalLsnr;
 
             if (internalLsnr != null)
                 internalLsnr.beforeJoin(locNode, log);
 
-            if (locNode.isClient() && reconnect)
+            if (reconnect)
                 locNode.setAttributes(spi.getLocNodeAttrs());
 
             marshalCredentialsOnJoin(locNode);
@@ -767,7 +765,7 @@ public class ZookeeperDiscoveryImpl {
                 connState = ConnectionState.STARTED;
             }
 
-            ZkRuntimeState rtState = this.rtState = new ZkRuntimeState(prevJoined);
+            ZkRuntimeState rtState = this.rtState = new ZkRuntimeState(reconnect);
 
             DiscoveryDataBag discoDataBag = new DiscoveryDataBag(locNode.id(), locNode.isClient());
 
@@ -977,8 +975,7 @@ public class ZookeeperDiscoveryImpl {
      * @throws InterruptedException If interrupted.
      */
     private void startJoin(ZkRuntimeState rtState, @Nullable ZkRuntimeState prevState, final byte[] joinDataBytes)
-        throws InterruptedException
-    {
+        throws InterruptedException {
         try {
             long startTime = System.currentTimeMillis();
 
@@ -1255,6 +1252,14 @@ public class ZookeeperDiscoveryImpl {
                     rtState.zkClient.getDataAsync(joinDataPath,
                         CheckJoinErrorWatcher.this,
                         CheckJoinErrorWatcher.this);
+
+                    // Client alive node can be deleted if join request wasn't processed between cluster restarts.
+                    if (locNode.isClient()) {
+                        ClientLocalNodeWatcher watcher = new ClientLocalNodeWatcher(rtState,
+                            CheckJoinErrorWatcher.this);
+
+                        rtState.zkClient.existsAsync(rtState.locNodeZkPath, watcher, watcher);
+                    }
                 }
             };
         }
@@ -1264,6 +1269,27 @@ public class ZookeeperDiscoveryImpl {
             if (rc != 0)
                 return;
 
+            processData(data);
+        }
+
+        /**
+         * Checks for join error synchronously.
+         */
+        void checkJoinError() {
+            try {
+                byte[] data = rtState.zkClient.getData(joinDataPath);
+
+                processData(data);
+            }
+            catch (Exception ignored) {
+                // No-op.
+            }
+        }
+
+        /**
+         * @param data Data.
+         */
+        private void processData(byte[] data) {
             if (!onProcessStart())
                 return;
 
@@ -2994,7 +3020,7 @@ public class ZookeeperDiscoveryImpl {
                 Collections.emptyMap(),
                 null).get();
 
-            if (rtState.prevJoined) {
+            if (rtState.reconnect) {
                 lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED,
                     joinedEvtData.topVer,
                     locNode,
@@ -4336,6 +4362,44 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
+     * Watcher for the local node. The local node can be deleted in case of cluster restarts.
+     * See {@link #cleanupPreviousClusterData}.
+     */
+    private class ClientLocalNodeWatcher extends PreviousNodeWatcher {
+        final CheckJoinErrorWatcher joinErrorWatcher;
+
+        /**
+         * @param rtState Runtime state.
+         */
+        ClientLocalNodeWatcher(ZkRuntimeState rtState, CheckJoinErrorWatcher joinErrorWatcher) {
+            super(rtState);
+
+            assert locNode.isClient() : locNode;
+
+            this.joinErrorWatcher = joinErrorWatcher;
+        }
+
+        /** {@inheritDoc} */
+        @Override void onPreviousNodeFail() {
+            // Check that there are no errors in join data.
+            joinErrorWatcher.checkJoinError();
+
+            if (rtState.errForClose != null || rtState.joined)
+                return;
+
+            synchronized (stateMux) {
+                if (connState != ConnectionState.STARTED)
+                    return;
+            }
+
+            if (log.isInfoEnabled())
+                log.info("Watched local node failed [locId=" + locNode.id() + ']');
+
+            localNodeFail("Local node was forced to stop.", true);
+        }
+    }
+
+    /**
      *
      */
     class CheckCoordinatorCallback extends ZkAbstractChildrenCallback {
index 9a52e56..7578fa4 100644 (file)
@@ -416,6 +416,7 @@ public class ZookeeperDiscoveryClientDisconnectTest extends ZookeeperDiscoverySp
     public void testReconnectServersRestart_2() throws Exception {
         reconnectServersRestart(3);
     }
+
     /**
      * @throws Exception If failed.
      */
@@ -454,6 +455,51 @@ public class ZookeeperDiscoveryClientDisconnectTest extends ZookeeperDiscoverySp
     }
 
     /**
+     * Checks that a client will reconnect after previous cluster data was cleaned.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testReconnectServersRestart_4() throws Exception {
+        startGrid(0);
+
+        helper.clientMode(true);
+
+        IgniteEx client = startGrid(1);
+
+        helper.clientMode(false);
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        client.events().localListen(event -> {
+            latch.countDown();
+
+            return true;
+        }, EVT_CLIENT_NODE_DISCONNECTED);
+
+        waitForTopology(2);
+
+        stopGrid(0);
+
+        evts.clear();
+
+        // Waiting for client starts to reconnect and create join request.
+        assertTrue("Failed to wait for client node disconnected.", latch.await(10, SECONDS));
+
+        // Restart cluster twice for incrementing internal order. (alive zk-nodes having lower order and containing
+        // client join request will be removed). See {@link ZookeeperDiscoveryImpl#cleanupPreviousClusterData}.
+        startGrid(0);
+
+        stopGrid(0);
+
+        evts.clear();
+
+        startGrid(0);
+
+        waitForTopology(2);
+    }
+
+    /**
      * @throws Exception If failed.
      */
     @Test
index dcbb9ba..ccacbc1 100644 (file)
@@ -118,7 +118,12 @@ class ZookeeperDiscoverySpiTestBase extends GridCommonAbstractTest {
     /** */
     protected Map<String, Object> userAttrs;
 
-    /** */
+    /**
+     * Map for checking discovery events per test. The {@link EVT_NODE_JOINED}, {@link EVT_NODE_FAILED}, {@link
+     * EVT_NODE_LEFT} events should be handled only once per topology version.
+     *
+     * Need to be cleaned in case of cluster restart.
+     */
     protected static ConcurrentHashMap<UUID, Map<T2<Integer, Long>, DiscoveryEvent>> evts = new ConcurrentHashMap<>();
 
     /** */