CURATOR-640: Run PathChildrenCache's & TreeCache's default thread isolated (#419) master
authortison <wander4096@gmail.com>
Wed, 15 Jun 2022 11:43:13 +0000 (19:43 +0800)
committerGitHub <noreply@github.com>
Wed, 15 Jun 2022 11:43:13 +0000 (13:43 +0200)
* CURATOR-640: Run PathChildrenCache default thread isolated

Signed-off-by: tison <wander4096@gmail.com>
curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/PathChildrenCache.java
curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java
curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestPathChildrenCache.java
curator-recipes/src/test/java/org/apache/curator/framework/recipes/cache/TestTreeCache.java

index 73930fd42d68a705f524f58f4c1cae233e59bf06..9121a49f9dab6b3173ba75e787fc5a0986e466f3 100644 (file)
@@ -25,6 +25,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import java.util.function.Supplier;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.EnsureContainers;
 import org.apache.curator.framework.WatcherRemoveCuratorFramework;
@@ -138,7 +139,7 @@ public class PathChildrenCache implements Closeable
             handleStateChange(newState);
         }
     };
-    public static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("PathChildrenCache");
+    public static final Supplier<ThreadFactory> defaultThreadFactorySupplier = () -> ThreadUtils.newThreadFactory("PathChildrenCache");
 
     /**
      * @param client the client
@@ -150,7 +151,7 @@ public class PathChildrenCache implements Closeable
     @SuppressWarnings("deprecation")
     public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode)
     {
-        this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));
+        this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactorySupplier.get()), true));
     }
 
     /**
@@ -174,7 +175,7 @@ public class PathChildrenCache implements Closeable
      */
     public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
     {
-        this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));
+        this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactorySupplier.get()), true));
     }
 
     /**
index 13b9c595cd8400e3a0b7a17ad9dcca6983f711d2..5e7deb76709cec15b656e320c8d95f0419423a57 100644 (file)
@@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import java.util.function.Supplier;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.WatcherRemoveCuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
@@ -108,7 +109,7 @@ public class TreeCache implements Closeable
             ExecutorService executor = executorService;
             if ( executor == null )
             {
-                executor = Executors.newSingleThreadExecutor(defaultThreadFactory);
+                executor = Executors.newSingleThreadExecutor(defaultThreadFactorySupplier.get());
             }
             return new TreeCache(client, path, cacheData, dataIsCompressed, maxDepth, executor, createParentNodes, disableZkWatches, selector);
         }
@@ -554,7 +555,7 @@ public class TreeCache implements Closeable
         }
     };
 
-    static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("TreeCache");
+    private static final Supplier<ThreadFactory> defaultThreadFactorySupplier = () -> ThreadUtils.newThreadFactory("TreeCache");
 
     /**
      * Create a TreeCache for the given client and path with default options.
@@ -571,7 +572,7 @@ public class TreeCache implements Closeable
      */
     public TreeCache(CuratorFramework client, String path)
     {
-        this(client, path, true, false, Integer.MAX_VALUE, Executors.newSingleThreadExecutor(defaultThreadFactory), false, false, new DefaultTreeCacheSelector());
+        this(client, path, true, false, Integer.MAX_VALUE, Executors.newSingleThreadExecutor(defaultThreadFactorySupplier.get()), false, false, new DefaultTreeCacheSelector());
     }
 
     /**
index 3790b34c05e23e47833ae1fd945825099360383c..89dd2fc5be2193de221f2d9286511cb0989797c9 100644 (file)
@@ -1151,4 +1151,45 @@ public class TestPathChildrenCache extends BaseClassForTests
             TestCleanState.closeAndTestClean(client);
         }
     }
+
+    @Test
+    public void testIsolatedThreadGroup() throws Exception {
+        AtomicReference<Throwable> exception = new AtomicReference<>();
+
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 30000, 30000, new RetryOneTime(1));
+        client.getUnhandledErrorListenable().addListener((message, e) -> exception.set(e));
+        client.start();
+
+        ThreadGroup threadGroup1 = new ThreadGroup("testGroup1");
+        Thread thread1 = new Thread(threadGroup1, () -> {
+            try ( final PathChildrenCache cache = new PathChildrenCache(client, "/test1", true) ) {
+                cache.start();
+                Thread.sleep(1000);
+            } catch (Exception e) {
+                exception.set(e);
+            }
+        });
+
+        thread1.start();
+        thread1.join();
+        assertNull(exception.get());
+
+        // After the thread group is destroyed, all PathChildrenCache instances
+        // will fail to start due to inability to add new threads into the first thread group
+        threadGroup1.destroy();
+
+        ThreadGroup threadGroup2 = new ThreadGroup("testGroup2");
+        Thread thread2 = new Thread(threadGroup2, () -> {
+            try ( final PathChildrenCache cache = new PathChildrenCache(client, "/test1", true) ) {
+                cache.start();
+                Thread.sleep(1000);
+            } catch (Exception e) {
+                exception.set(e);
+            }
+        });
+
+        thread2.start();
+        thread2.join();
+        assertNull(exception.get());
+    }
 }
index 2b9e491c6adc58eff08832d74ffdd2bb2a551452..93692e6cd47f93b804a52c092528d456f48effbb 100644 (file)
@@ -25,9 +25,12 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import com.google.common.collect.ImmutableSet;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
+import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.compatibility.CuratorTestBase;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.zookeeper.CreateMode;
@@ -640,4 +643,45 @@ public class TestTreeCache extends BaseTestTreeCache
 
         assertNoMoreEvents();
     }
+
+    @Test
+    public void testIsolatedThreadGroup() throws Exception {
+        AtomicReference<Throwable> exception = new AtomicReference<>();
+
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), 30000, 30000, new RetryOneTime(1));
+        client.getUnhandledErrorListenable().addListener((message, e) -> exception.set(e));
+        client.start();
+
+        ThreadGroup threadGroup1 = new ThreadGroup("testGroup1");
+        Thread thread1 = new Thread(threadGroup1, () -> {
+            try ( final TreeCache cache = new TreeCache(client, "/test1") ) {
+                cache.start();
+                Thread.sleep(1000);
+            } catch (Exception e) {
+                exception.set(e);
+            }
+        });
+
+        thread1.start();
+        thread1.join();
+        assertNull(exception.get());
+
+        // After the thread group is destroyed, all PathChildrenCache instances
+        // will fail to start due to inability to add new threads into the first thread group
+        threadGroup1.destroy();
+
+        ThreadGroup threadGroup2 = new ThreadGroup("testGroup2");
+        Thread thread2 = new Thread(threadGroup2, () -> {
+            try ( final TreeCache cache = new TreeCache(client, "/test1") ) {
+                cache.start();
+                Thread.sleep(1000);
+            } catch (Exception e) {
+                exception.set(e);
+            }
+        });
+
+        thread2.start();
+        thread2.join();
+        assertNull(exception.get());
+    }
 }