import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.IgniteCacheFutureImpl;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
-import org.apache.ignite.internal.processors.cache.IgniteFinishedCacheFutureImpl;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
ctx.grid().getOrCreateCache(ccfg);
}
+
+ ensureCacheStarted();
}
/**
catch (IgniteDataStreamerTimeoutException e) {
throw e;
}
- catch (IgniteException e) {
- return new IgniteFinishedCacheFutureImpl<>(e);
- }
finally {
leaveBusy();
}
else
checkSecurityPermission(SecurityPermission.CACHE_PUT);
- KeyCacheObject key0;
- CacheObject val0;
-
- try {
- key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, null, key, true);
- val0 = cacheObjProc.toCacheObject(cacheObjCtx, val, true);
- }
- catch (Exception e) {
- return new IgniteFinishedCacheFutureImpl<>(e);
- }
+ KeyCacheObject key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, null, key, true);
+ CacheObject val0 = cacheObjProc.toCacheObject(cacheObjCtx, val, true);
return addDataInternal(Collections.singleton(new DataStreamerEntry(key0, val0)));
}
}
/**
+ * Ensures that cache has been started and is ready to store streamed data.
+ */
+ private void ensureCacheStarted() {
+ DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(cacheName);
+
+ assert desc != null;
+
+ if (desc.startTopologyVersion() == null)
+ return;
+
+ IgniteInternalFuture<?> affReadyFut = ctx.cache().context().exchange()
+ .affinityReadyFuture(desc.startTopologyVersion());
+
+ if (affReadyFut != null) {
+ try {
+ affReadyFut.get();
+ }
+ catch (IgniteCheckedException ex) {
+ throw new IgniteException(ex);
+ }
+ }
+ }
+
+ /**
*
*/
private class Buffer {
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.CacheException;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.apache.ignite.transactions.TransactionException;
import org.apache.log4j.Appender;
import org.apache.log4j.Logger;
import org.apache.log4j.SimpleLayout;
}
/**
+ * @throws Exception If failed.
+ */
+ public void testDataStreamerWaitsUntilDynamicCacheStartIsFinished() throws Exception {
+ final Ignite ignite0 = startGrids(2);
+ final Ignite ignite1 = grid(1);
+
+ final String cacheName = "testCache";
+
+ IgniteCache<Integer, Integer> cache = ignite0.getOrCreateCache(
+ new CacheConfiguration<Integer, Integer>().setName(cacheName));
+
+ try (IgniteDataStreamer<Integer, Integer> ldr = ignite1.dataStreamer(cacheName)) {
+ ldr.addData(0, 0);
+ }
+
+ assertEquals(Integer.valueOf(0), cache.get(0));
+ }
+
+ /**
* Gets cache configuration.
*
* @return Cache configuration.