Optimize HelixTaskExecutor reset() in event of shutdown (#2183) master
authorQi (Quincy) Qu <qqu@linkedin.com>
Thu, 11 Aug 2022 17:57:59 +0000 (10:57 -0700)
committerGitHub <noreply@github.com>
Thu, 11 Aug 2022 17:57:59 +0000 (10:57 -0700)
Optimize HelixTaskExecutor reset() in event of shutdown

Some instances maybe reset() multiple times during participant shutdown.
This commit refactor logic in HelixTaskExecutor to reduce unnecessary
method call.

helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java

index 80b70bbc3f884a9f7eb5e962aec09922b7b15149..b21a065a92dc1bffa3a1c8813e712994e9e47414 100644 (file)
@@ -28,6 +28,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -136,6 +137,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
   private final ParticipantStatusMonitor _monitor;
   public static final String MAX_THREADS = "maxThreads";
 
+  // true if all partition state are "clean" as same after reset()
+  private volatile boolean _isCleanState = true;
   private MessageQueueMonitor _messageQueueMonitor;
   private GenericHelixController _controller;
   private Long _lastSessionSyncTime;
@@ -677,13 +680,10 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
     }
   }
 
-  void reset() {
-    LOG.info("Reset HelixTaskExecutor");
-
-    if (_messageQueueMonitor != null) {
-      _messageQueueMonitor.reset();
-    }
-
+  /**
+   * Shutdown the registered thread pool executors. This method will be no-op if called repeatedly.
+   */
+  private void shutdownExecutors() {
     synchronized (_hdlrFtyRegistry) {
       for (String msgType : _hdlrFtyRegistry.keySet()) {
         // don't un-register factories, just shutdown all executors
@@ -694,17 +694,37 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
           LOG.info("Reset executor for msgType: " + msgType + ", pool: " + pool);
           shutdownAndAwaitTermination(pool, item);
         }
-
-        if (item.factory() != null) {
-          try {
-            item.factory().reset();
-          } catch (Exception ex) {
-            LOG.error("Failed to reset the factory {} of message type {}.", item.factory().toString(),
-                msgType, ex);
-          }
-        }
       }
     }
+  }
+
+  synchronized void reset() {
+    if (_isCleanState) {
+      LOG.info("HelixTaskExecutor is in clean state, no need to reset again");
+      return;
+    }
+    LOG.info("Reset HelixTaskExecutor");
+
+    if (_messageQueueMonitor != null) {
+      _messageQueueMonitor.reset();
+    }
+
+    shutdownExecutors();
+
+    synchronized (_hdlrFtyRegistry) {
+      _hdlrFtyRegistry.values()
+          .stream()
+          .map(MsgHandlerFactoryRegistryItem::factory)
+          .distinct()
+          .filter(Objects::nonNull)
+          .forEach(factory -> {
+            try {
+              factory.reset();
+            } catch (Exception ex) {
+              LOG.error("Failed to reset the factory {}.", factory.toString(), ex);
+            }
+          });
+    }
     // threads pool specific to STATE_TRANSITION.Key specific pool are not shut down.
     // this is a potential area to improve. https://github.com/apache/helix/issues/1245
 
@@ -712,8 +732,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
     // Log all tasks that fail to terminate
     for (String taskId : _taskMap.keySet()) {
       MessageTaskInfo info = _taskMap.get(taskId);
-      sb.append(
-          "Task: " + taskId + " fails to terminate. Message: " + info._task.getMessage() + "\n");
+      sb.append("Task: " + taskId + " fails to terminate. Message: " + info._task.getMessage() + "\n");
     }
 
     LOG.info(sb.toString());
@@ -724,6 +743,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
     _knownMessageIds.clear();
 
     _lastSessionSyncTime = null;
+    _isCleanState = true;
   }
 
   void init() {
@@ -744,7 +764,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
         _monitor.createExecutorMonitor(type, newPool);
         return newPool;
       });
-      LOG.info("Setup the thread pool for type: %s, isShutdown: %s", msgType, pool.isShutdown());
+      LOG.info("Setup the thread pool for type: {}, isShutdown: {}", msgType, pool.isShutdown());
     }
   }
 
@@ -835,6 +855,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
       init();
       // continue to process messages
     }
+    _isCleanState = false;
 
     // if prefetch is disabled in MessageListenerCallback, we need to read all new messages from zk.
     if (messages == null || messages.isEmpty()) {
@@ -1442,7 +1463,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
       nopMsg.setTgtName(instanceName);
       accessor
           .setProperty(accessor.keyBuilder().message(nopMsg.getTgtName(), nopMsg.getId()), nopMsg);
-      LOG.info("Send NO_OP message to {}}, msgId: {}.", nopMsg.getTgtName(), nopMsg.getId());
+      LOG.info("Send NO_OP message to {}, msgId: {}.", nopMsg.getTgtName(), nopMsg.getId());
     } catch (Exception e) {
       LOG.error("Failed to send NO_OP message to {}.", instanceName, e);
     }
@@ -1454,6 +1475,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor {
     _isShuttingDown = true;
     _timer.cancel();
 
+    shutdownExecutors();
     reset();
     _monitor.shutDown();
     LOG.info("Shutdown HelixTaskExecutor finished");
index 50a078282aa99e5f03fda33f07344aa9abdd8129..1a16de4515e9d79d29bf87929f320e62816805e9 100644 (file)
@@ -164,6 +164,7 @@ public class HelixStateMachineEngine implements StateMachineEngine {
   }
 
   private void loopStateModelFactories(Consumer<StateModel> consumer) {
+    // TODO: evaluate impact and consider parallelization
     for (Map<String, StateModelFactory<? extends StateModel>> ftyMap : _stateModelFactoryMap
         .values()) {
       for (StateModelFactory<? extends StateModel> stateModelFactory : ftyMap.values()) {
index 7b239908a9b801394d4e0016e9c4d7a575b0f3b9..1a8cc5ac4da0384b84f1e96a0fde8776008bee1b 100644 (file)
@@ -132,6 +132,21 @@ public class TestHelixTaskExecutor {
     }
   }
 
+  private class TestMessageHandlerFactory3 extends TestMessageHandlerFactory {
+    private boolean _resetDone = false;
+
+    @Override
+    public List<String> getMessageTypes() {
+      return ImmutableList.of("msgType1", "msgType2", "msgType3");
+    }
+
+    @Override
+    public void reset() {
+      Assert.assertFalse(_resetDone, "reset() should only be triggered once in TestMessageHandlerFactory3");
+      _resetDone = true;
+    }
+  }
+
   class CancellableHandlerFactory implements MultiTypeMessageHandlerFactory {
 
     int _handlersCreated = 0;
@@ -798,6 +813,29 @@ public class TestHelixTaskExecutor {
     System.out.println("END TestCMTaskExecutor.testHandlerResetTimeout()");
   }
 
+  @Test
+  public void testMsgHandlerRegistryAndShutdown() {
+    HelixTaskExecutor executor = new HelixTaskExecutor();
+    HelixManager manager = new MockClusterManager();
+    TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
+    TestMessageHandlerFactory3 factoryMulti = new TestMessageHandlerFactory3();
+    executor.registerMessageHandlerFactory(factory, HelixTaskExecutor.DEFAULT_PARALLEL_TASKS, 200);
+    executor.registerMessageHandlerFactory(factoryMulti, HelixTaskExecutor.DEFAULT_PARALLEL_TASKS, 200);
+
+    final Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
+    msg.setTgtSessionId("*");
+    msg.setTgtName("Localhost_1123");
+    msg.setSrcName("127.101.1.23_2234");
+
+    NotificationContext changeContext = new NotificationContext(manager);
+    changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
+    executor.onMessage("some", Collections.singletonList(msg), changeContext);
+    Assert.assertEquals(executor._hdlrFtyRegistry.size(), 4);
+    // Ensure TestMessageHandlerFactory3 instance is reset and reset exactly once
+    executor.shutdown();
+    Assert.assertTrue(factoryMulti._resetDone, "TestMessageHandlerFactory3 should be reset");
+  }
+
   @Test()
   public void testNoRetry() throws InterruptedException {
     System.out.println("START " + TestHelper.getTestMethodName());