IGNITE-10899 Service Grid: disconnecting during node stop may lead to deadlock. ...
authorVyacheslav Daradur <daradurvs@gmail.com>
Wed, 16 Jan 2019 15:52:10 +0000 (18:52 +0300)
committerDmitriy Pavlov <dpavlov@apache.org>
Wed, 16 Jan 2019 15:52:47 +0000 (18:52 +0300)
Signed-off-by: Dmitriy Pavlov <dpavlov@apache.org>
modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java
modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentManager.java
modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java

index 1bc1fd7..c164c6f 100644 (file)
@@ -424,6 +424,9 @@ public class IgniteServiceProcessor extends ServiceProcessorAdapter implements I
         opsLock.writeLock().lock();
 
         try {
+            if (ctx.isStopping())
+                return;
+
             disconnected = true;
 
             stopProcessor(new IgniteClientDisconnectedCheckedException(
index e2a3add..9851c5f 100644 (file)
@@ -134,23 +134,28 @@ public class ServiceDeploymentManager {
      * @param stopErr Cause error of deployment manager stop.
      */
     void stopProcessing(IgniteCheckedException stopErr) {
-        busyLock.block(); // Will not release it.
+        busyLock.block();
 
-        ctx.event().removeDiscoveryEventListener(discoLsnr);
+        try {
+            ctx.event().removeDiscoveryEventListener(discoLsnr);
 
-        ctx.io().removeMessageListener(commLsnr);
+            ctx.io().removeMessageListener(commLsnr);
 
-        U.cancel(depWorker);
+            U.cancel(depWorker);
 
-        U.join(depWorker, log);
+            U.join(depWorker, log);
 
-        depWorker.tasksQueue.clear();
+            depWorker.tasksQueue.clear();
 
-        pendingEvts.clear();
+            pendingEvts.clear();
 
-        tasks.values().forEach(t -> t.completeError(stopErr));
+            tasks.values().forEach(t -> t.completeError(stopErr));
 
-        tasks.clear();
+            tasks.clear();
+        }
+        finally {
+            busyLock.unblock();
+        }
     }
 
     /**
@@ -301,14 +306,14 @@ public class ServiceDeploymentManager {
             if (!enterBusy())
                 return;
 
-            final UUID snd = evt.eventNode().id();
-            final int evtType = evt.type();
+            try {
+                final UUID snd = evt.eventNode().id();
+                final int evtType = evt.type();
 
-            assert snd != null : "Event's node id shouldn't be null.";
-            assert evtType == EVT_NODE_JOINED || evtType == EVT_NODE_LEFT || evtType == EVT_NODE_FAILED
-                || evtType == EVT_DISCOVERY_CUSTOM_EVT : "Unexpected event was received, evt=" + evt;
+                assert snd != null : "Event's node id shouldn't be null.";
+                assert evtType == EVT_NODE_JOINED || evtType == EVT_NODE_LEFT || evtType == EVT_NODE_FAILED
+                    || evtType == EVT_DISCOVERY_CUSTOM_EVT : "Unexpected event was received, evt=" + evt;
 
-            try {
                 if (evtType == EVT_DISCOVERY_CUSTOM_EVT) {
                     DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)evt).customMessage();
 
index ac3aaa6..fcf2938 100644 (file)
@@ -31,12 +31,15 @@ import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.services.Service;
 import org.apache.ignite.services.ServiceContext;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -171,6 +174,78 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void disconnectingDuringNodeStoppingIsNotHangTest() throws Exception {
+        Assume.assumeTrue(isEventDrivenServiceProcessorEnabled());
+
+        runServiceProcessorStoppingTest(
+            new IgniteInClosure<IgniteServiceProcessor>() {
+                @Override public void apply(IgniteServiceProcessor srvcProc) {
+                    srvcProc.onKernalStop(true);
+                }
+            },
+            new IgniteInClosure<IgniteServiceProcessor>() {
+                @Override public void apply(IgniteServiceProcessor srvcProc) {
+                    srvcProc.onDisconnected(new IgniteFinishedFutureImpl<>());
+                }
+            }
+        );
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void stoppingDuringDisconnectingIsNotHangTest() throws Exception {
+        Assume.assumeTrue(isEventDrivenServiceProcessorEnabled());
+
+        runServiceProcessorStoppingTest(
+            new IgniteInClosure<IgniteServiceProcessor>() {
+                @Override public void apply(IgniteServiceProcessor srvcProc) {
+                    srvcProc.onDisconnected(new IgniteFinishedFutureImpl<>());
+                }
+            },
+            new IgniteInClosure<IgniteServiceProcessor>() {
+                @Override public void apply(IgniteServiceProcessor srvcProc) {
+                    srvcProc.onKernalStop(true);
+                }
+            }
+        );
+    }
+
+    /**
+     * @param c1 Action to apply over service processor concurrently.
+     * @param c2 Action to apply over service processor concurrently.
+     * @throws Exception In case of an error.
+     */
+    private void runServiceProcessorStoppingTest(IgniteInClosure<IgniteServiceProcessor> c1,
+        IgniteInClosure<IgniteServiceProcessor> c2) throws Exception {
+
+        try {
+            final IgniteEx ignite = startGrid(0);
+
+            final IgniteServiceProcessor srvcProc = (IgniteServiceProcessor)(ignite.context().service());
+
+            final IgniteInternalFuture c1Fut = GridTestUtils.runAsync(() -> {
+                c1.apply(srvcProc);
+            });
+
+            final IgniteInternalFuture c2Fut = GridTestUtils.runAsync(() -> {
+                c2.apply(srvcProc);
+            });
+
+            c1Fut.get(getTestTimeout(), TimeUnit.MILLISECONDS);
+
+            c2Fut.get(getTestTimeout(), TimeUnit.MILLISECONDS);
+        }
+        finally {
+            stopAllGrids(true);
+        }
+    }
+
+    /**
      * Simple map service.
      */
     public interface TestService {
index 081d759..30a532f 100644 (file)
@@ -103,7 +103,6 @@ import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionRollbackException;
 import org.apache.ignite.transactions.TransactionTimeoutException;
 import org.jetbrains.annotations.NotNull;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -725,7 +724,6 @@ public class GridCommandHandlerTest extends GridCommonAbstractTest {
      * Simulate uncommitted backup transactions and test rolling back using utility.
      */
     @Test
-    @Ignore("https://issues.apache.org/jira/browse/IGNITE-10899")
     public void testKillHangingRemoteTransactions() throws Exception {
         final int cnt = 3;