In JVM dtests need to clean up after instance shutdown
authorJoseph Lynch <joe.e.lynch@gmail.com>
Sat, 8 Dec 2018 02:29:08 +0000 (18:29 -0800)
committerAlex Petrov <oleksandr.petrov@gmail.com>
Wed, 9 Jan 2019 14:00:16 +0000 (15:00 +0100)
Adds additional cleanup logic to ensure we don't leak classloaders and
their associated objects when running the in JVM dtests.

Patch by Joseph Lynch; reviewed by Alex Petrov for CASSANDRA-14922

15 files changed:
build.xml
ide/idea/workspace.xml
src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
src/java/org/apache/cassandra/hints/HintsBufferPool.java
src/java/org/apache/cassandra/hints/HintsService.java
src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
src/java/org/apache/cassandra/net/MessagingService.java
src/java/org/apache/cassandra/net/async/NettyFactory.java
src/java/org/apache/cassandra/utils/NativeLibraryDarwin.java
src/java/org/apache/cassandra/utils/NativeLibraryLinux.java
src/java/org/apache/cassandra/utils/NativeLibraryWindows.java
src/java/org/apache/cassandra/utils/concurrent/Ref.java
test/distributed/org/apache/cassandra/distributed/DistributedReadWritePathTest.java
test/distributed/org/apache/cassandra/distributed/Instance.java
test/distributed/org/apache/cassandra/distributed/TestCluster.java

index 3973689..60b66fc 100644 (file)
--- a/build.xml
+++ b/build.xml
         <jvmarg value="-ea"/>
         <jvmarg value="-Dcassandra.debugrefcount=true"/>
         <jvmarg value="-Xss256k"/>
+        <!-- When we do classloader manipulation SoftReferences can cause memory leaks
+             that can OOM our test runs. The next two settings informs our GC
+             algorithm to limit the metaspace size and clean up SoftReferences
+             more aggressively rather than waiting. See CASSANDRA-14922 for more details.
+        -->
+        <jvmarg value="-XX:MaxMetaspaceSize=256M" />
+        <jvmarg value="-XX:SoftRefLRUPolicyMSPerMB=0" />
         <jvmarg value="-Dcassandra.memtable_row_overhead_computation_step=100"/>
         <jvmarg value="-Dcassandra.test.use_prepared=${cassandra.test.use_prepared}"/>
         <jvmarg value="-Dcassandra.test.offsetseed=@{poffset}"/>
index a2dea2a..150f1a0 100644 (file)
       <option name="MAIN_CLASS_NAME" value="" />
       <option name="METHOD_NAME" value="" />
       <option name="TEST_OBJECT" value="class" />
-      <option name="VM_PARAMETERS" value="-Dcassandra.config=file://$PROJECT_DIR$/test/conf/cassandra.yaml -Dlogback.configurationFile=file://$PROJECT_DIR$/test/conf/logback-test.xml -Dcassandra.logdir=$PROJECT_DIR$/build/test/logs -Djava.library.path=$PROJECT_DIR$/lib/sigar-bin -Dlegacy-sstable-root=$PROJECT_DIR$/test/data/legacy-sstables -Dinvalid-legacy-sstable-root=$PROJECT_DIR$/test/data/invalid-legacy-sstables -Dcassandra.ring_delay_ms=1000 -Dcassandra.skip_sync=true -ea" />
+      <option name="VM_PARAMETERS" value="-Dcassandra.config=file://$PROJECT_DIR$/test/conf/cassandra.yaml -Dlogback.configurationFile=file://$PROJECT_DIR$/test/conf/logback-test.xml -Dcassandra.logdir=$PROJECT_DIR$/build/test/logs -Djava.library.path=$PROJECT_DIR$/lib/sigar-bin -Dlegacy-sstable-root=$PROJECT_DIR$/test/data/legacy-sstables -Dinvalid-legacy-sstable-root=$PROJECT_DIR$/test/data/invalid-legacy-sstables -Dcassandra.ring_delay_ms=1000 -Dcassandra.skip_sync=true -ea -XX:MaxMetaspaceSize=256M -XX:SoftRefLRUPolicyMSPerMB=0" />
       <option name="PARAMETERS" value="" />
       <option name="WORKING_DIRECTORY" value="" />
       <option name="ENV_VARIABLES" />
index e51e4c2..5e3e5cf 100644 (file)
@@ -52,7 +52,7 @@ public class ScheduledExecutors
     {
         ExecutorService[] executors = new ExecutorService[] { scheduledFastTasks, scheduledTasks, nonPeriodicTasks, optionalTasks };
         for (ExecutorService executor : executors)
-            executor.shutdown();
+            executor.shutdownNow();
         for (ExecutorService executor : executors)
             executor.awaitTermination(60, TimeUnit.SECONDS);
     }
index 25f9bc1..f705de1 100644 (file)
  */
 package org.apache.cassandra.hints;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.net.MessagingService;
+import sun.nio.ch.DirectBuffer;
 
 /**
  * A primitive pool of {@link HintsBuffer} buffers. Under normal conditions should only hold two buffers - the currently
  * written to one, and a reserve buffer to switch to when the first one is beyond capacity.
  */
-final class HintsBufferPool
+final class HintsBufferPool implements Closeable
 {
     interface FlushCallback
     {
@@ -129,4 +132,9 @@ final class HintsBufferPool
         allocatedBuffers++;
         return HintsBuffer.create(bufferSize);
     }
+
+    public void close()
+    {
+        currentBuffer.free();
+    }
 }
index 1a352c2..1fd2d1a 100644 (file)
@@ -256,6 +256,7 @@ public final class HintsService implements HintsServiceMBean
         writeExecutor.shutdownBlocking();
 
         HintsServiceDiagnostics.dispatchingShutdown(this);
+        bufferPool.close();
     }
 
     /**
index cf14c3d..01e30d6 100644 (file)
@@ -2475,4 +2475,13 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         return reader;
     }
 
+    public static void shutdownBlocking() throws InterruptedException
+    {
+        if (syncExecutor != null)
+        {
+            syncExecutor.shutdownNow();
+            syncExecutor.awaitTermination(0, TimeUnit.SECONDS);
+        }
+        resetTidying();
+    }
 }
index 761e210..f5c064e 100644 (file)
@@ -1142,6 +1142,8 @@ public final class MessagingService implements MessagingServiceMBean
 
             if (!isTest)
                 NettyFactory.instance.close();
+
+            clearMessageSinks();
         }
         catch (Exception e)
         {
index 2366722..81de5d8 100644 (file)
@@ -389,7 +389,7 @@ public final class NettyFactory
     {
         EventLoopGroup[] groups = new EventLoopGroup[] { acceptGroup, outboundGroup, inboundGroup, streamingGroup };
         for (EventLoopGroup group : groups)
-            group.shutdownGracefully();
+            group.shutdownGracefully(0, 2, TimeUnit.SECONDS);
         for (EventLoopGroup group : groups)
             group.awaitTermination(60, TimeUnit.SECONDS);
     }
index d6f1a9e..6ed18d1 100644 (file)
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.utils;
 
+import java.util.Collections;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +52,7 @@ public class NativeLibraryDarwin implements NativeLibraryWrapper
     {
         try
         {
-            Native.register("c");
+            Native.register(com.sun.jna.NativeLibrary.getInstance("c", Collections.emptyMap()));
             available = true;
         }
         catch (NoClassDefFoundError e)
index b6667e4..3f21d17 100644 (file)
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.utils;
 
+import java.util.Collections;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +52,7 @@ public class NativeLibraryLinux implements NativeLibraryWrapper
     {
         try
         {
-            Native.register("c");
+            Native.register(com.sun.jna.NativeLibrary.getInstance("c", Collections.emptyMap()));
             available = true;
         }
         catch (NoClassDefFoundError e)
index e6e823c..b8304c7 100644 (file)
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.utils;
 
+import java.util.Collections;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +45,7 @@ public class NativeLibraryWindows implements NativeLibraryWrapper
     {
         try
         {
-            Native.register("kernel32");
+            Native.register(com.sun.jna.NativeLibrary.getInstance("kernel32", Collections.emptyMap()));
             available = true;
         }
         catch (NoClassDefFoundError e)
index 1a17a1f..3c1b7cc 100644 (file)
@@ -709,5 +709,10 @@ public final class Ref<T> implements RefCounted<T>
     {
         EXEC.shutdown();
         EXEC.awaitTermination(60, TimeUnit.SECONDS);
+        if (STRONG_LEAK_DETECTOR != null)
+        {
+            STRONG_LEAK_DETECTOR.shutdownNow();
+            STRONG_LEAK_DETECTOR.awaitTermination(60, TimeUnit.SECONDS);
+        }
     }
 }
index d03ef4f..04ea8b0 100644 (file)
@@ -28,7 +28,16 @@ import static org.apache.cassandra.net.MessagingService.Verb.READ_REPAIR;
 public class DistributedReadWritePathTest extends DistributedTestBase
 {
     @Test
-    public void coordinatorRead() throws Throwable
+    public void coordinatorReadTest() throws Throwable
+    {
+        for (int i = 0; i < 10; i++)
+        {
+            System.out.println(i);
+            coordinatorRead();
+        }
+    }
+
+    private void coordinatorRead() throws Throwable
     {
         try (TestCluster cluster = createCluster(3))
         {
index f344411..c68b961 100644 (file)
@@ -29,6 +29,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 
+import org.slf4j.LoggerFactory;
+
+import ch.qos.logback.classic.LoggerContext;
 import org.apache.cassandra.batchlog.BatchlogManager;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.SharedExecutorPool;
@@ -53,6 +56,7 @@ import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.hints.HintsService;
 import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -64,6 +68,7 @@ import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.async.MessageInHandler;
+import org.apache.cassandra.net.async.NettyFactory;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -229,6 +234,11 @@ public class Instance extends InvokableInstance
             DatabaseDescriptor.createAllDirectories();
             Keyspace.setInitialized();
             SystemKeyspace.persistLocalMetadata();
+            // Even though we don't use MessagingService, access the static NettyFactory
+            // instance here so that we start the static event loop state
+            // (e.g. acceptGroup, inboundGroup, outboundGroup, etc ...). We can remove this
+            // once we actually use the MessagingService to communicate between nodes
+            NettyFactory.instance.getClass();
         }).accept(config);
     }
 
@@ -330,10 +340,10 @@ public class Instance extends InvokableInstance
         runOnInstance(() -> {
             Throwable error = null;
             error = runAndMergeThrowable(error,
+                    CompactionManager.instance::forceShutdown,
                     BatchlogManager.instance::shutdown,
                     HintsService.instance::shutdownBlocking,
                     CommitLog.instance::shutdownBlocking,
-                    CompactionManager.instance::forceShutdown,
                     Gossiper.instance::stop,
                     SecondaryIndexManager::shutdownExecutors,
                     MessagingService.instance()::shutdown,
@@ -347,8 +357,12 @@ public class Instance extends InvokableInstance
                     StageManager::shutdownAndWait,
                     SharedExecutorPool.SHARED::shutdown,
                     Memtable.MEMORY_POOL::shutdown,
-                    ScheduledExecutors::shutdownAndWait);
+                    ScheduledExecutors::shutdownAndWait,
+                    SSTableReader::shutdownBlocking);
+
             error = shutdownAndWait(error, ActiveRepairService.repairCommandExecutor);
+            LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
+            loggerContext.stop();
             Throwables.maybeFail(error);
         });
     }
@@ -357,25 +371,11 @@ public class Instance extends InvokableInstance
     {
         return runAndMergeThrowable(existing, () -> {
             executor.shutdownNow();
-            executor.awaitTermination(5, TimeUnit.SECONDS);
+            executor.awaitTermination(20, TimeUnit.SECONDS);
             assert executor.isTerminated() && executor.isShutdown() : executor;
         });
     }
 
-    private static Throwable runAndMergeThrowable(Throwable existing, ThrowingRunnable runnable)
-    {
-        try
-        {
-            runnable.run();
-        }
-        catch (Throwable t)
-        {
-            return Throwables.merge(existing, t);
-        }
-
-        return existing;
-    }
-
     private static Throwable runAndMergeThrowable(Throwable existing, ThrowingRunnable ... runnables)
     {
         for (ThrowingRunnable runnable : runnables)
index 2b979ee..cc6cf81 100644 (file)
@@ -31,16 +31,21 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import java.util.function.IntFunction;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.Sets;
 
+import io.netty.util.concurrent.FastThreadLocal;
+import io.netty.util.concurrent.FastThreadLocalThread;
+import io.netty.util.internal.InternalThreadLocalMap;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
@@ -79,7 +84,7 @@ import org.apache.cassandra.utils.concurrent.SimpleCondition;
  */
 public class TestCluster implements AutoCloseable
 {
-    private static ExecutorService exec = Executors.newCachedThreadPool(new NamedThreadFactory("cluster-async-tasks"));
+    private final ExecutorService exec = Executors.newCachedThreadPool(new NamedThreadFactory("cluster-async-tasks"));
 
     private final File root;
     private final List<Instance> instances;
@@ -271,19 +276,37 @@ public class TestCluster implements AutoCloseable
     }
 
     @Override
-    public void close()
+    public void close() throws InterruptedException, TimeoutException, ExecutionException
     {
         List<Future<?>> futures = instances.stream()
                 .map(i -> exec.submit(i::shutdown))
                 .collect(Collectors.toList());
 
-//        withThreadLeakCheck(futures);
-
         // Make sure to only delete directory when threads are stopped
-        exec.submit(() -> {
+        Future combined = exec.submit(() -> {
             FBUtilities.waitOnFutures(futures);
             FileUtils.deleteRecursive(root);
         });
+
+        combined.get(60, TimeUnit.SECONDS);
+
+        exec.shutdownNow();
+        exec.awaitTermination(10, TimeUnit.SECONDS);
+
+        //withThreadLeakCheck(futures);
+        Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
+        for (Thread thread : threadSet)
+        {
+            if (thread instanceof FastThreadLocalThread)
+                ((FastThreadLocalThread)thread).setThreadLocalMap(null);
+        }
+
+        InternalThreadLocalMap.remove();
+        InternalThreadLocalMap.destroy();
+
+        FastThreadLocal.removeAll();
+        FastThreadLocal.destroy();
+        System.gc();
     }
 
     // We do not want this check to run every time until we fix problems with tread stops