IGNITE-10891 Fix IgnitePdsThreadInterruptionTest.testInterruptsOnLFSRead flaky in...
authorDmitriy Govorukhin <dmitriy.govorukhin@gmail.com>
Tue, 25 Dec 2018 11:01:50 +0000 (14:01 +0300)
committerDmitriy Govorukhin <dmitriy.govorukhin@gmail.com>
Fri, 11 Jan 2019 18:27:04 +0000 (21:27 +0300)
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java
modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsThreadInterruptionTest.java
modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java

index aa1b061..c04073e 100644 (file)
@@ -163,17 +163,23 @@ public class DelayedPageReplacementTracker {
                 if (!hasLockedPages)
                     return;
 
+                boolean interrupted = false;
+
                 while (locked.contains(id)) {
                     if (log.isDebugEnabled())
                         log.debug("Found replaced page [" + id + "] which is being written to page store, wait for finish replacement");
 
                     try {
+                        // Uninterruptable wait.
                         locked.wait();
                     }
                     catch (InterruptedException e) {
-                        throw new IgniteInterruptedException(e);
+                        interrupted = true;
                     }
                 }
+
+                if (interrupted)
+                    Thread.currentThread().interrupt();
             }
         }
 
index 59371e3..7c5fd81 100644 (file)
@@ -460,12 +460,13 @@ public final class X {
      * into check.
      *
      * @param t Throwable to check (if {@code null}, {@code false} is returned).
+     * @param msg Message text that should be in cause.
      * @param cls Cause classes to check (if {@code null} or empty, {@code false} is returned).
      * @return {@code True} if one of the causing exception is an instance of passed in classes,
      *      {@code false} otherwise.
      */
     @SafeVarargs
-    public static boolean hasCause(@Nullable Throwable t, @Nullable Class<?>... cls) {
+    public static boolean hasCause(@Nullable Throwable t,  @Nullable String msg, @Nullable Class<?>... cls) {
         if (t == null || F.isEmpty(cls))
             return false;
 
@@ -473,12 +474,20 @@ public final class X {
 
         for (Throwable th = t; th != null; th = th.getCause()) {
             for (Class<?> c : cls) {
-                if (c.isAssignableFrom(th.getClass()))
+                if (c.isAssignableFrom(th.getClass())) {
+                    if (msg != null) {
+                        if (th.getMessage() != null && th.getMessage().contains(msg))
+                            return true;
+                        else
+                            continue;
+                    }
+
                     return true;
+                }
             }
 
             for (Throwable n : th.getSuppressed()) {
-                if (hasCause(n, cls))
+                if (hasCause(n, msg, cls))
                     return true;
             }
 
@@ -490,6 +499,19 @@ public final class X {
     }
 
     /**
+     * Checks if passed in {@code 'Throwable'} has given class in {@code 'cause'} hierarchy <b>including</b> that
+     * throwable itself. <p> Note that this method follows includes {@link Throwable#getSuppressed()} into check.
+     *
+     * @param t Throwable to check (if {@code null}, {@code false} is returned).
+     * @param cls Cause classes to check (if {@code null} or empty, {@code false} is returned).
+     * @return {@code True} if one of the causing exception is an instance of passed in classes, {@code false}
+     * otherwise.
+     */
+    public static boolean hasCause(@Nullable Throwable t, @Nullable Class<?>... cls) {
+        return hasCause(t, null, cls);
+    }
+
+    /**
      * Checks if passed throwable has given class in one of the suppressed exceptions.
      *
      * @param t Throwable to check (if {@code null}, {@code false} is returned).
index 4b7db7d..9f7f791 100644 (file)
 
 package org.apache.ignite.internal.processors.cache.persistence.db.file;
 
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.DataPageEvictionMode;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIOFactory;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -40,110 +50,95 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class IgnitePdsThreadInterruptionTest extends GridCommonAbstractTest {
     /** */
-    private static final int PAGE_SIZE = 1 << 12; // 4096
+    public static final int THREADS_CNT = 100;
 
     /** */
-    public static final int THREADS_CNT = 100;
+    private static final int VAL_LEN = 8192;
 
-    /**
-     * Cache name.
-     */
-    private final String CACHE_NAME = "cache";
+    /** */
+    private static final byte[] PAYLOAD = new byte[VAL_LEN];
 
     /** */
-    private volatile boolean stop = false;
+    private volatile boolean stop;
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        final IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        cfg.setDataStorageConfiguration(storageConfiguration());
-
-        CacheConfiguration ccfg = new CacheConfiguration<>(CACHE_NAME);
-
-        RendezvousAffinityFunction affinityFunction = new RendezvousAffinityFunction();
-        affinityFunction.setPartitions(1);
-
-        ccfg.setAffinity(affinityFunction);
-
-        cfg.setCacheConfiguration(ccfg);
-
-        return cfg;
-    }
-
-    /**
-     * @return DataStorage configuration.
-     */
-    private DataStorageConfiguration storageConfiguration() {
-        DataRegionConfiguration regionCfg = new DataRegionConfiguration()
-                .setInitialSize(10L * 1024L * 1024L)
-                .setMaxSize(10L * 1024L * 1024L)
-                .setPageEvictionMode(DataPageEvictionMode.RANDOM_LRU);
-
-        DataStorageConfiguration cfg = new DataStorageConfiguration()
-                .setWalMode(WALMode.LOG_ONLY)
-                .setWalFsyncDelayNanos(0)
-                .setPageSize(PAGE_SIZE)
-                .setFileIOFactory(new AsyncFileIOFactory());
-
-        cfg.setDefaultDataRegionConfiguration(regionCfg);
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setWalMode(WALMode.LOG_ONLY)
+            .setWalFsyncDelayNanos(0)
+            .setFileIOFactory(new AsyncFileIOFactory())
+            .setDefaultDataRegionConfiguration(
+                new DataRegionConfiguration()
+                    .setPersistenceEnabled(true)
+                    .setInitialSize(10L * 1024L * 1024L)
+                    .setMaxSize(10L * 1024L * 1024L)
+            ));
+
+        cfg.setCacheConfiguration(
+            new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+                .setAffinity(new RendezvousAffinityFunction(false, 1))
+        );
 
         return cfg;
     }
 
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        super.beforeTestsStarted();
-
+    @Before
+    public void before() throws Exception {
         cleanPersistenceDir();
     }
 
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
+    @After
+    public void after() throws Exception {
         stopAllGrids();
 
         cleanPersistenceDir();
     }
 
     /**
-     * Tests interruptions on LFS read.
+     * Tests interruptions on read.
      *
      * @throws Exception If failed.
      */
     @Test
-    public void testInterruptsOnLFSRead() throws Exception {
-        final Ignite ignite = startGrid();
-
-        ignite.active(true);
+    public void testInterruptsOnRead() throws Exception {
+        Ignite ignite = startGrid();
 
-        final int valLen = 8192;
+        ignite.cluster().active(true);
 
-        final byte[] payload = new byte[valLen];
+        int maxKey = 10_000;
 
-        final int maxKey = 10_000;
+        Set<Integer> keysToCheck = new HashSet<>();
 
         Thread[] workers = new Thread[THREADS_CNT];
 
+        // Load data.
+        try (IgniteDataStreamer<Integer, byte[]> st = ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
+            st.allowOverwrite(true);
 
-        final IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME);
-
-        for (int i=0; i < maxKey; i++)
-            cache.put(i, payload);
+            for (int i = 0; i < maxKey; i++){
+                keysToCheck.add(i);
 
-        final AtomicReference<Throwable> fail = new AtomicReference<>();
+                st.addData(i, PAYLOAD);
+            }
+        }
 
+        IgniteCache<Integer,  byte[]> cache = ignite.cache(DEFAULT_CACHE_NAME);
 
-        Runnable clo = new Runnable() {
-            @Override public void run() {
-                cache.get(ThreadLocalRandom.current().nextInt(maxKey / 5));
-            }
-        };
+        AtomicReference<Throwable> fail = new AtomicReference<>();
 
         for (int i = 0; i < workers.length; i++) {
-            workers[i] = new Thread(clo);
+            workers[i] = new Thread(() -> cache.get(ThreadLocalRandom.current().nextInt(maxKey / 5)));
+
             workers[i].setName("reader-" + i);
-            workers[i].setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
-                @Override public void uncaughtException(Thread t, Throwable e) {
+
+            workers[i].setUncaughtExceptionHandler((t, e) -> {
+                // We can get IgniteInterruptedException on GridCacheAdapter.asyncOpsSem if thread was interrupted
+                // before asyncOpsSem.acquire().
+                if (!X.hasCause(e,
+                    "Failed to wait for asynchronous operation permit",
+                    IgniteInterruptedException.class)) {
                     fail.compareAndSet(null, e);
                 }
             });
@@ -152,15 +147,11 @@ public class IgnitePdsThreadInterruptionTest extends GridCommonAbstractTest {
         for (Thread worker : workers)
             worker.start();
 
-        //Thread.sleep(3_000);
-
         // Interrupts should not affect reads.
         for (int i = 0;i < workers.length / 2; i++)
             workers[i].interrupt();
 
-        Thread.sleep(3_000);
-
-        stop = true;
+        U.sleep(3_000);
 
         for (Thread worker : workers)
             worker.join();
@@ -171,53 +162,61 @@ public class IgnitePdsThreadInterruptionTest extends GridCommonAbstractTest {
 
         int verifiedKeys = 0;
 
+        // Get all keys.
+        Map<Integer, byte[]> res = cache.getAll(keysToCheck);
+
+        Assert.assertEquals(maxKey, keysToCheck.size());
+        Assert.assertEquals(maxKey, res.size());
+
         // Post check.
-        for (int i = 0; i < maxKey; i++) {
-            byte[] val = (byte[]) cache.get(i);
+        for (Integer key: keysToCheck) {
+            byte[] val = res.get(key);
 
-            if (val != null) {
-                assertEquals("Illegal length", valLen, val.length);
+            assertNotNull(val);
+            assertEquals("Illegal length", VAL_LEN, val.length);
 
-                verifiedKeys++;
-            }
+            verifiedKeys++;
         }
 
+        Assert.assertEquals(maxKey, verifiedKeys);
+
         log.info("Verified keys: " + verifiedKeys);
     }
 
     /**
      * Tests interruptions on WAL write.
      *
-     * @throws Exception
+     * @throws Exception If failed.
      */
     @Test
     public void testInterruptsOnWALWrite() throws Exception {
-        final Ignite ignite = startGrid();
+        Ignite ignite = startGrid();
 
-        ignite.active(true);
+        ignite.cluster().active(true);
 
-        final int valLen = 8192;
+        int maxKey = 100_000;
 
-        final byte[] payload = new byte[valLen];
-
-        final int maxKey = 100_000;
+        Set<Integer> keysToCheck = new GridConcurrentHashSet<>();
 
         Thread[] workers = new Thread[THREADS_CNT];
 
-        final AtomicReference<Throwable> fail = new AtomicReference<>();
+        AtomicReference<Throwable> fail = new AtomicReference<>();
 
-        Runnable clo = new Runnable() {
-            @Override public void run() {
-                IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME);
+        for (int i = 0; i < workers.length; i++) {
+            workers[i] = new Thread(() -> {
+                IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME);
 
-                while (!stop)
-                    cache.put(ThreadLocalRandom.current().nextInt(maxKey), payload);
-            }
-        };
+                while (!stop) {
+                    int key = ThreadLocalRandom.current().nextInt(maxKey);
+
+                    cache.put(key, PAYLOAD);
+
+                    keysToCheck.add(key);
+                }
+            });
 
-        for (int i = 0; i < workers.length; i++) {
-            workers[i] = new Thread(clo);
             workers[i].setName("writer-" + i);
+
             workers[i].setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
                 @Override public void uncaughtException(Thread t, Throwable e) {
                     fail.compareAndSet(null, e);
@@ -245,19 +244,22 @@ public class IgnitePdsThreadInterruptionTest extends GridCommonAbstractTest {
 
         assertNull(t);
 
-        IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME);
+        IgniteCache<Integer, byte[]> cache = ignite.cache(DEFAULT_CACHE_NAME);
 
         int verifiedKeys = 0;
 
+        Map<Integer, byte[]> res = cache.getAll(keysToCheck);
+
+        Assert.assertEquals(res.size(), keysToCheck.size());
+
         // Post check.
-        for (int i = 0; i < maxKey; i++) {
-            byte[] val = (byte[]) cache.get(i);
+        for (Integer key: keysToCheck) {
+            byte[] val = res.get(key);
 
-            if (val != null) {
-                assertEquals("Illegal length", valLen, val.length);
+            assertNotNull(val);
+            assertEquals("Illegal length", VAL_LEN, val.length);
 
-                verifiedKeys++;
-            }
+            verifiedKeys++;
         }
 
         log.info("Verified keys: " + verifiedKeys);
index e6a3949..3817994 100644 (file)
@@ -103,6 +103,7 @@ import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionRollbackException;
 import org.apache.ignite.transactions.TransactionTimeoutException;
 import org.jetbrains.annotations.NotNull;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -691,6 +692,7 @@ public class GridCommandHandlerTest extends GridCommonAbstractTest {
      * Simulate uncommitted backup transactions and test rolling back using utility.
      */
     @Test
+    @Ignore("https://issues.apache.org/jira/browse/IGNITE-10899")
     public void testKillHangingRemoteTransactions() throws Exception {
         final int cnt = 3;