IGNITE-7389 DataStreamer hangs if exception was thrown during addData which isn't...
authorilantukh <ilantukh@gridgain.com>
Thu, 1 Feb 2018 15:42:42 +0000 (18:42 +0300)
committerAnton Vinogradov <av@apache.org>
Thu, 1 Feb 2018 15:42:42 +0000 (18:42 +0300)
Signed-off-by: Anton Vinogradov <av@apache.org>
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java

index 18abcd8..4771c3e 100644 (file)
@@ -286,7 +286,7 @@ public class DynamicCacheDescriptor {
 
 
     /**
-     * @return Start topology version.
+     * @return Start topology version or {@code null} if cache configured statically.
      */
     @Nullable public AffinityTopologyVersion startTopologyVersion() {
         return startTopVer;
index c71d129..7d6a8d3 100644 (file)
@@ -73,6 +73,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -81,7 +82,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheGateway;
 import org.apache.ignite.internal.processors.cache.GridCacheUtils;
 import org.apache.ignite.internal.processors.cache.IgniteCacheFutureImpl;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
-import org.apache.ignite.internal.processors.cache.IgniteFinishedCacheFutureImpl;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
@@ -365,6 +365,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
             ctx.grid().getOrCreateCache(ccfg);
         }
+
+        ensureCacheStarted();
     }
 
     /**
@@ -589,9 +591,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
         catch (IgniteDataStreamerTimeoutException e) {
             throw e;
         }
-        catch (IgniteException e) {
-            return new IgniteFinishedCacheFutureImpl<>(e);
-        }
         finally {
             leaveBusy();
         }
@@ -670,16 +669,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
         else
             checkSecurityPermission(SecurityPermission.CACHE_PUT);
 
-        KeyCacheObject key0;
-        CacheObject val0;
-
-        try {
-            key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, null, key, true);
-            val0 = cacheObjProc.toCacheObject(cacheObjCtx, val, true);
-        }
-        catch (Exception e) {
-            return new IgniteFinishedCacheFutureImpl<>(e);
-        }
+        KeyCacheObject key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, null, key, true);
+        CacheObject val0 = cacheObjProc.toCacheObject(cacheObjCtx, val, true);
 
         return addDataInternal(Collections.singleton(new DataStreamerEntry(key0, val0)));
     }
@@ -1337,6 +1328,30 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
     }
 
     /**
+     * Ensures that cache has been started and is ready to store streamed data.
+     */
+    private void ensureCacheStarted() {
+        DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(cacheName);
+
+        assert desc != null;
+
+        if (desc.startTopologyVersion() == null)
+            return;
+
+        IgniteInternalFuture<?> affReadyFut = ctx.cache().context().exchange()
+            .affinityReadyFuture(desc.startTopologyVersion());
+
+        if (affReadyFut != null) {
+            try {
+                affReadyFut.get();
+            }
+            catch (IgniteCheckedException ex) {
+                throw new IgniteException(ex);
+            }
+        }
+    }
+
+    /**
      *
      */
     private class Buffer {
index 940f8ce..d277b2e 100644 (file)
@@ -20,14 +20,12 @@ package org.apache.ignite.internal.processors.datastreamer;
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.CacheException;
@@ -56,7 +54,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.apache.ignite.transactions.TransactionException;
 import org.apache.log4j.Appender;
 import org.apache.log4j.Logger;
 import org.apache.log4j.SimpleLayout;
@@ -491,6 +488,25 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testDataStreamerWaitsUntilDynamicCacheStartIsFinished() throws Exception {
+        final Ignite ignite0 = startGrids(2);
+        final Ignite ignite1 = grid(1);
+
+        final String cacheName = "testCache";
+
+        IgniteCache<Integer, Integer> cache = ignite0.getOrCreateCache(
+            new CacheConfiguration<Integer, Integer>().setName(cacheName));
+
+        try (IgniteDataStreamer<Integer, Integer> ldr = ignite1.dataStreamer(cacheName)) {
+            ldr.addData(0, 0);
+        }
+
+        assertEquals(Integer.valueOf(0), cache.get(0));
+    }
+
+    /**
      * Gets cache configuration.
      *
      * @return Cache configuration.