IGNITE-8911 Fixed while cache is restarting it's possible to start new cache with...
authorEdShangGG <eshangareev@gridgain.com>
Sat, 29 Dec 2018 13:58:23 +0000 (16:58 +0300)
committerDmitriy Govorukhin <dmitriy.govorukhin@gmail.com>
Sat, 29 Dec 2018 13:58:23 +0000 (16:58 +0300)
Signed-off-by: Dmitriy Govorukhin <dmitriy.govorukhin@gmail.com>
30 files changed:
modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java
modules/core/src/main/java/org/apache/ignite/IgniteCacheRestartingException.java
modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/TxTopologyVersionFuture.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/AtomicDataStructureProxy.java
modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java
modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsCacheStartStopWithFreqCheckpointTest.java
modules/geospatial/src/test/java/org/apache/ignite/internal/processors/query/h2/H2IndexingAbstractGeoSelfTest.java
modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java
modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractBasicSelfTest.java

index 636a717..0c3a885 100644 (file)
@@ -131,7 +131,7 @@ public abstract class JdbcAbstractDmlStatementSelfTest extends GridCommonAbstrac
 
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
-        ((IgniteEx)ignite(0)).context().cache().dynamicDestroyCache(DEFAULT_CACHE_NAME, true, true, false);
+        ((IgniteEx)ignite(0)).context().cache().dynamicDestroyCache(DEFAULT_CACHE_NAME, true, true, false, null);
 
         if (conn != null) {
             conn.close();
index a3a7490..1dbfc67 100644 (file)
@@ -18,6 +18,7 @@
 
 package org.apache.ignite;
 
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.lang.IgniteFuture;
 import org.jetbrains.annotations.Nullable;
 
@@ -29,26 +30,34 @@ public class IgniteCacheRestartingException extends IgniteException {
     private static final long serialVersionUID = 0L;
 
     /** */
-    private final IgniteFuture<?> restartFut;
+    private final transient IgniteFuture<?> restartFut;
+
+    /**
+     * @param cacheName Error message.
+     */
+    public IgniteCacheRestartingException(String cacheName) {
+        this(null, cacheName, null);
+    }
 
     /**
      * @param restartFut Restart future.
-     * @param msg Error message.
+     * @param cacheName Error message.
      */
-    public IgniteCacheRestartingException(IgniteFuture<?> restartFut, String msg) {
-        this(restartFut, msg, null);
+    public IgniteCacheRestartingException(IgniteFuture<?> restartFut, String cacheName) {
+        this(restartFut, cacheName, null);
     }
 
     /**
      * @param restartFut Restart future.
-     * @param msg Error message.
+     * @param cacheName Cache name what is restarting.
      * @param cause Optional nested exception (can be {@code null}).
      */
     public IgniteCacheRestartingException(
         IgniteFuture<?> restartFut,
-        String msg,
-        @Nullable Throwable cause) {
-        super(msg, cause);
+        String cacheName,
+        @Nullable Throwable cause
+    ) {
+        super("Cache is restarting:" + cacheName + ", you could wait restart completion with restartFuture", cause);
 
         this.restartFut = restartFut;
     }
index b84771a..3a3af8e 100644 (file)
@@ -3039,7 +3039,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
 
             Boolean res = false;
 
-            if (ctx.cache().cache(cacheName) == null) {
+            IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName, false, true);
+
+            if (cache == null) {
                 res =
                     sql ? ctx.cache().dynamicStartSqlCache(cacheCfg).get() :
                         ctx.cache().dynamicStartCache(cacheCfg,
@@ -3048,9 +3050,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                             false,
                             true,
                             true).get();
-            }
 
-            return new IgniteBiTuple<>(ctx.cache().publicJCache(cacheName), res);
+                return new IgniteBiTuple<>(ctx.cache().publicJCache(cacheName), res);
+            }
+            else
+                return new IgniteBiTuple<>(cache, res);
         }
         catch (IgniteCheckedException e) {
             throw CU.convertToCacheException(e);
@@ -3298,7 +3302,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         try {
             checkClusterState();
 
-            return ctx.cache().dynamicDestroyCache(cacheName, sql, checkThreadTx, false);
+            return ctx.cache().dynamicDestroyCache(cacheName, sql, checkThreadTx, false, null);
         }
         finally {
             unguard();
@@ -3318,7 +3322,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         try {
             checkClusterState();
 
-            return ctx.cache().dynamicDestroyCaches(cacheNames, checkThreadTx, false);
+            return ctx.cache().dynamicDestroyCaches(cacheNames, checkThreadTx);
         }
         finally {
             unguard();
@@ -3334,10 +3338,15 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         try {
             checkClusterState();
 
-            if (ctx.cache().cache(cacheName) == null)
+            IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName, false, true);
+
+            if (cache == null) {
                 ctx.cache().getOrCreateFromTemplate(cacheName, true).get();
 
-            return ctx.cache().publicJCache(cacheName);
+                return ctx.cache().publicJCache(cacheName);
+            }
+
+            return cache;
         }
         catch (IgniteCheckedException e) {
             throw CU.convertToCacheException(e);
index b97d12f..e3b67f2 100644 (file)
@@ -53,7 +53,6 @@ import org.apache.ignite.internal.processors.query.QuerySchema;
 import org.apache.ignite.internal.processors.query.QuerySchemaPatch;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.lang.GridFunc;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
@@ -79,6 +78,9 @@ import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType
  * Logic related to cache discovery data processing.
  */
 class ClusterCachesInfo {
+    /** Representation of null for restarting caches map */
+    private static final IgniteUuid NULL_OBJECT = new IgniteUuid();
+
     /** Version since which merge of config is supports. */
     private static final IgniteProductVersion V_MERGE_CONFIG_SINCE = IgniteProductVersion.fromString("2.5.0");
 
@@ -94,8 +96,8 @@ class ClusterCachesInfo {
     /** Cache templates. */
     private final ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates = new ConcurrentHashMap<>();
 
-    /** Caches currently being restarted. */
-    private final Set<String> restartingCaches = new GridConcurrentHashSet<>();
+    /** Caches currently being restarted (with restarter id). */
+    private final ConcurrentHashMap<String, IgniteUuid> restartingCaches = new ConcurrentHashMap<>();
 
     /** */
     private final IgniteLogger log;
@@ -411,7 +413,7 @@ class ClusterCachesInfo {
             requests.add(DynamicCacheChangeRequest.stopRequest(ctx, cacheName, cacheDescr.sql(), true));
         }
 
-        processCacheChangeRequests(exchangeActions, requests, topVer,false);
+        processCacheChangeRequests(exchangeActions, requests, topVer, false);
 
         failMsg.exchangeActions(exchangeActions);
     }
@@ -468,296 +470,399 @@ class ClusterCachesInfo {
         ExchangeActions exchangeActions,
         Collection<DynamicCacheChangeRequest> reqs,
         AffinityTopologyVersion topVer,
-        boolean persistedCfgs) {
+        boolean persistedCfgs
+    ) {
         CacheChangeProcessResult res = new CacheChangeProcessResult();
 
         final List<T2<DynamicCacheChangeRequest, AffinityTopologyVersion>> reqsToComplete = new ArrayList<>();
 
-        for (DynamicCacheChangeRequest req : reqs) {
-            if (req.template()) {
-                CacheConfiguration ccfg = req.startCacheConfiguration();
+        for (DynamicCacheChangeRequest req : reqs)
+            processCacheChangeRequest0(req, exchangeActions, topVer, persistedCfgs, res, reqsToComplete);
 
-                assert ccfg != null : req;
+        if (!F.isEmpty(res.addedDescs)) {
+            AffinityTopologyVersion startTopVer = res.needExchange ? topVer.nextMinorVersion() : topVer;
 
-                DynamicCacheDescriptor desc = registeredTemplates.get(req.cacheName());
+            for (DynamicCacheDescriptor desc : res.addedDescs) {
+                assert desc.template() || res.needExchange;
 
-                if (desc == null) {
-                    DynamicCacheDescriptor templateDesc = new DynamicCacheDescriptor(ctx,
-                        ccfg,
-                        req.cacheType(),
-                        null,
-                        true,
-                        req.initiatingNodeId(),
-                        false,
-                        false,
-                        req.deploymentId(),
-                        req.schema());
+                desc.startTopologyVersion(startTopVer);
+            }
+        }
 
-                    DynamicCacheDescriptor old = registeredTemplates().put(ccfg.getName(), templateDesc);
+        if (!F.isEmpty(reqsToComplete)) {
+            ctx.closure().callLocalSafe(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    for (T2<DynamicCacheChangeRequest, AffinityTopologyVersion> t : reqsToComplete) {
+                        final DynamicCacheChangeRequest req = t.get1();
+                        AffinityTopologyVersion waitTopVer = t.get2();
 
-                    assert old == null;
+                        IgniteInternalFuture<?> fut = waitTopVer != null ?
+                            ctx.cache().context().exchange().affinityReadyFuture(waitTopVer) : null;
 
-                    res.addedDescs.add(templateDesc);
+                        if (fut == null || fut.isDone())
+                            ctx.cache().completeCacheStartFuture(req, false, null);
+                        else {
+                            fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
+                                @Override public void apply(IgniteInternalFuture<?> fut) {
+                                    ctx.cache().completeCacheStartFuture(req, false, null);
+                                }
+                            });
+                        }
+                    }
+
+                    return null;
                 }
+            });
+        }
 
-                if (!persistedCfgs)
-                    ctx.cache().completeTemplateAddFuture(ccfg.getName(), req.deploymentId());
+        return res;
+    }
 
-                continue;
-            }
+    /**
+     * @param req Cache change request.
+     * @param exchangeActions Exchange actions to update.
+     * @param topVer Topology version.
+     * @param persistedCfgs {@code True} if process start of persisted caches during cluster activation.
+     * @param res Accumulator for cache change process results.
+     * @param reqsToComplete Accumulator for cache change requests which should be completed after
+     * ({@link org.apache.ignite.internal.processors.cache.GridCacheProcessor#pendingFuts}
+     */
+    private void processCacheChangeRequest0(
+        DynamicCacheChangeRequest req,
+        ExchangeActions exchangeActions,
+        AffinityTopologyVersion topVer,
+        boolean persistedCfgs,
+        CacheChangeProcessResult res,
+        List<T2<DynamicCacheChangeRequest, AffinityTopologyVersion>> reqsToComplete
+    ) {
+        String cacheName = req.cacheName();
 
-            assert !req.clientStartOnly() : req;
+        if (req.template()) {
+            processTemplateAddRequest(persistedCfgs, res, req);
 
-            DynamicCacheDescriptor desc = registeredCaches.get(req.cacheName());
+            return;
+        }
 
-            boolean needExchange = false;
+        assert !req.clientStartOnly() : req;
 
-            boolean clientCacheStart = false;
+        DynamicCacheDescriptor desc = registeredCaches.get(cacheName);
 
-            AffinityTopologyVersion waitTopVer = null;
+        boolean needExchange = false;
 
-            if (req.start()) {
-                // Starting a new cache.
-                if (desc == null) {
-                    String conflictErr = checkCacheConflict(req.startCacheConfiguration());
+        boolean clientCacheStart = false;
 
-                    if (conflictErr != null) {
-                        U.warn(log, "Ignore cache start request. " + conflictErr);
+        AffinityTopologyVersion waitTopVer = null;
 
-                        IgniteCheckedException err = new IgniteCheckedException("Failed to start " +
-                            "cache. " + conflictErr);
+        if (req.start()) {
+            boolean proceedFuther = true;
 
-                        if (persistedCfgs)
-                            res.errs.add(err);
-                        else
-                            ctx.cache().completeCacheStartFuture(req, false, err);
+            if (restartingCaches.containsKey(cacheName) &&
+                ((req.restartId() == null && restartingCaches.get(cacheName) != NULL_OBJECT)
+                    || (req.restartId() != null &&!req.restartId().equals(restartingCaches.get(cacheName))))) {
 
-                        continue;
-                    }
+                if (req.failIfExists()) {
+                    ctx.cache().completeCacheStartFuture(req, false,
+                        new CacheExistsException("Failed to start cache (a cache is restarting): " + cacheName));
+                }
 
-                    if (req.clientStartOnly()) {
-                        assert !persistedCfgs;
+                proceedFuther = false;
+            }
 
-                        ctx.cache().completeCacheStartFuture(req, false, new IgniteCheckedException("Failed to start " +
-                            "client cache (a cache with the given name is not started): " + req.cacheName()));
-                    }
-                    else {
-                        SchemaOperationException err = QueryUtils.checkQueryEntityConflicts(
-                            req.startCacheConfiguration(), registeredCaches.values());
+            if (proceedFuther) {
+                if (desc == null) { /* Starting a new cache.*/
+                    if (!processStartNewCacheRequest(exchangeActions, topVer, persistedCfgs, res, req, cacheName))
+                        return;
 
-                        if (err != null) {
-                            if (persistedCfgs)
-                                res.errs.add(err);
+                    needExchange = true;
+                }
+                else {
+                    clientCacheStart = processStartAlreadyStartedCacheRequest(topVer, persistedCfgs, req, cacheName, desc);
+
+                    if (!clientCacheStart) {
+                        if (desc.clientCacheStartVersion() != null)
+                            waitTopVer = desc.clientCacheStartVersion();
+                        else {
+                            AffinityTopologyVersion nodeStartVer =
+                                new AffinityTopologyVersion(ctx.discovery().localNode().order(), 0);
+
+                            if (desc.startTopologyVersion() != null)
+                                waitTopVer = desc.startTopologyVersion();
                             else
-                                ctx.cache().completeCacheStartFuture(req, false, err);
+                                waitTopVer = desc.receivedFromStartVersion();
 
-                            continue;
+                            if (waitTopVer == null || nodeStartVer.compareTo(waitTopVer) > 0)
+                                waitTopVer = nodeStartVer;
                         }
+                    }
+                }
+            }
+        }
+        else if (req.resetLostPartitions()) {
+            if (desc != null) {
+                needExchange = true;
 
-                        CacheConfiguration<?, ?> ccfg = req.startCacheConfiguration();
+                exchangeActions.addCacheToResetLostPartitions(req, desc);
+            }
+        }
+        else if (req.stop()) {
+            if (desc != null) {
+                if (req.sql() && !desc.sql()) {
+                    ctx.cache().completeCacheStartFuture(req, false,
+                        new IgniteCheckedException("Only cache created with CREATE TABLE may be removed with " +
+                            "DROP TABLE [cacheName=" + cacheName + ']'));
 
-                        assert req.cacheType() != null : req;
-                        assert F.eq(ccfg.getName(), req.cacheName()) : req;
+                    return;
+                }
 
-                        int cacheId = CU.cacheId(req.cacheName());
+                processStopCacheRequest(exchangeActions, req, cacheName, desc);
 
-                        CacheGroupDescriptor grpDesc = registerCacheGroup(exchangeActions,
-                            topVer,
-                            ccfg,
-                            cacheId,
-                            req.initiatingNodeId(),
-                            req.deploymentId(),
-                            req.encryptionKey());
+                needExchange = true;
+            }
+        }
+        else
+            assert false : req;
 
-                        DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ctx,
-                            ccfg,
-                            req.cacheType(),
-                            grpDesc,
-                            false,
-                            req.initiatingNodeId(),
-                            false,
-                            req.sql(),
-                            req.deploymentId(),
-                            req.schema());
+        if (!needExchange) {
+            if (!clientCacheStart && ctx.localNodeId().equals(req.initiatingNodeId()))
+                reqsToComplete.add(new T2<>(req, waitTopVer));
+        }
+        else
+            res.needExchange = true;
+    }
 
-                        DynamicCacheDescriptor old = registeredCaches.put(ccfg.getName(), startDesc);
+    /**
+     * @param req Cache change request.
+     * @param exchangeActions Exchange actions to update.
+     * @param cacheName Cache name.
+     * @param desc Dynamic cache descriptor.
+     */
+    private void processStopCacheRequest(
+        ExchangeActions exchangeActions,
+        DynamicCacheChangeRequest req,
+        String cacheName,
+        DynamicCacheDescriptor desc
+    ) {
+        DynamicCacheDescriptor old = registeredCaches.remove(cacheName);
 
-                        restartingCaches.remove(ccfg.getName());
+        if (req.restart()) {
+            IgniteUuid restartId = req.restartId();
 
-                        assert old == null;
+            restartingCaches.put(cacheName, restartId == null ? NULL_OBJECT : restartId);
+        }
 
-                        ctx.discovery().setCacheFilter(
-                            startDesc.cacheId(),
-                            grpDesc.groupId(),
-                            ccfg.getName(),
-                            ccfg.getNearConfiguration() != null);
+        assert old != null && old == desc : "Dynamic cache map was concurrently modified [req=" + req + ']';
 
-                        if (!persistedCfgs) {
-                            ctx.discovery().addClientNode(req.cacheName(),
-                                req.initiatingNodeId(),
-                                req.nearCacheConfiguration() != null);
-                        }
+        ctx.discovery().removeCacheFilter(cacheName);
 
-                        res.addedDescs.add(startDesc);
+        exchangeActions.addCacheToStop(req, desc);
 
-                        exchangeActions.addCacheToStart(req, startDesc);
+        CacheGroupDescriptor grpDesc = registeredCacheGrps.get(desc.groupId());
 
-                        needExchange = true;
-                    }
-                }
-                else {
-                    assert !persistedCfgs;
-                    assert req.initiatingNodeId() != null : req;
+        assert grpDesc != null && grpDesc.groupId() == desc.groupId() : desc;
 
-                    if (req.failIfExists()) {
-                        ctx.cache().completeCacheStartFuture(req, false,
-                            new CacheExistsException("Failed to start cache " +
-                                "(a cache with the same name is already started): " + req.cacheName()));
-                    }
-                    else {
-                        // Cache already exists, it is possible client cache is needed.
-                        ClusterNode node = ctx.discovery().node(req.initiatingNodeId());
+        grpDesc.onCacheStopped(desc.cacheName(), desc.cacheId());
 
-                        boolean clientReq = node != null &&
-                            !ctx.discovery().cacheAffinityNode(node, req.cacheName());
+        if (!grpDesc.hasCaches()) {
+            registeredCacheGrps.remove(grpDesc.groupId());
 
-                        if (clientReq) {
-                            ctx.discovery().addClientNode(req.cacheName(),
-                                req.initiatingNodeId(),
-                                req.nearCacheConfiguration() != null);
+            ctx.discovery().removeCacheGroup(grpDesc);
 
-                            if (node.id().equals(req.initiatingNodeId())) {
-                                desc.clientCacheStartVersion(topVer);
+            exchangeActions.addCacheGroupToStop(grpDesc, req.destroy());
 
-                                clientCacheStart = true;
+            assert exchangeActions.checkStopRequestConsistency(grpDesc.groupId());
 
-                                ctx.discovery().clientCacheStartEvent(req.requestId(), F.asMap(req.cacheName(), req), null);
-                            }
-                        }
-                    }
+            // If all caches in group will be destroyed it is not necessary to destroy single cache
+            // because group will be stopped anyway.
+            if (req.destroy()) {
+                for (ExchangeActions.CacheActionData action : exchangeActions.cacheStopRequests()) {
+                    if (action.descriptor().groupId() == grpDesc.groupId())
+                        action.request().destroy(false);
                 }
+            }
+        }
+    }
 
-                if (!needExchange && !clientCacheStart && desc != null) {
-                    if (desc.clientCacheStartVersion() != null)
-                        waitTopVer = desc.clientCacheStartVersion();
-                    else {
-                        AffinityTopologyVersion nodeStartVer =
-                            new AffinityTopologyVersion(ctx.discovery().localNode().order(), 0);
+    /**
+     * @param persistedCfgs {@code True} if process start of persisted caches during cluster activation.
+     * @param res Accumulator for cache change process results.
+     * @param req Dynamic cache change request.
+     */
+    private void processTemplateAddRequest(
+        boolean persistedCfgs,
+        CacheChangeProcessResult res,
+        DynamicCacheChangeRequest req
+    ) {
+        CacheConfiguration ccfg = req.startCacheConfiguration();
 
-                        if (desc.startTopologyVersion() != null)
-                            waitTopVer = desc.startTopologyVersion();
-                        else
-                            waitTopVer = desc.receivedFromStartVersion();
+        assert ccfg != null : req;
 
-                        if (waitTopVer == null || nodeStartVer.compareTo(waitTopVer) > 0)
-                            waitTopVer = nodeStartVer;
-                    }
-                }
-            }
-            else if (req.resetLostPartitions()) {
-                if (desc != null) {
-                    needExchange = true;
+        DynamicCacheDescriptor desc = registeredTemplates.get(req.cacheName());
 
-                    exchangeActions.addCacheToResetLostPartitions(req, desc);
-                }
-            }
-            else if (req.stop()) {
-                if (desc != null) {
-                    if (req.sql() && !desc.sql()) {
-                        ctx.cache().completeCacheStartFuture(req, false,
-                            new IgniteCheckedException("Only cache created with CREATE TABLE may be removed with " +
-                                "DROP TABLE [cacheName=" + req.cacheName() + ']'));
-
-                        continue;
-                    }
+        if (desc == null) {
+            DynamicCacheDescriptor templateDesc = new DynamicCacheDescriptor(ctx,
+                ccfg,
+                req.cacheType(),
+                null,
+                true,
+                req.initiatingNodeId(),
+                false,
+                false,
+                req.deploymentId(),
+                req.schema());
 
-                    DynamicCacheDescriptor old = registeredCaches.remove(req.cacheName());
+            DynamicCacheDescriptor old = registeredTemplates().put(ccfg.getName(), templateDesc);
 
-                    if (req.restart())
-                        restartingCaches.add(req.cacheName());
+            assert old == null;
 
-                    assert old != null && old == desc : "Dynamic cache map was concurrently modified [req=" + req + ']';
+            res.addedDescs.add(templateDesc);
+        }
 
-                    ctx.discovery().removeCacheFilter(req.cacheName());
+        if (!persistedCfgs)
+            ctx.cache().completeTemplateAddFuture(ccfg.getName(), req.deploymentId());
+    }
 
-                    needExchange = true;
+    /**
+     * @param topVer Topology version.
+     * @param persistedCfgs {@code True} if process start of persisted caches during cluster activation.
+     * @param req Cache change request.
+     * @param cacheName Cache name.
+     * @param desc Dynamic cache descriptor.
+     * @return True if it is needed to start client cache.
+     */
+    private boolean processStartAlreadyStartedCacheRequest(
+        AffinityTopologyVersion topVer,
+        boolean persistedCfgs,
+        DynamicCacheChangeRequest req,
+        String cacheName,
+        DynamicCacheDescriptor desc
+    ) {
+        assert !persistedCfgs;
+        assert req.initiatingNodeId() != null : req;
+
+        if (req.failIfExists()) {
+            ctx.cache().completeCacheStartFuture(req, false,
+                new CacheExistsException("Failed to start cache " +
+                    "(a cache with the same name is already started): " + cacheName));
+        }
+        else {
+            // Cache already exists, it is possible client cache is needed.
+            ClusterNode node = ctx.discovery().node(req.initiatingNodeId());
 
-                    exchangeActions.addCacheToStop(req, desc);
+            boolean clientReq = node != null &&
+                !ctx.discovery().cacheAffinityNode(node, cacheName);
 
-                    CacheGroupDescriptor grpDesc = registeredCacheGrps.get(desc.groupId());
+            if (clientReq) {
+                ctx.discovery().addClientNode(cacheName,
+                    req.initiatingNodeId(),
+                    req.nearCacheConfiguration() != null);
 
-                    assert grpDesc != null && grpDesc.groupId() == desc.groupId() : desc;
+                if (node.id().equals(req.initiatingNodeId())) {
+                    desc.clientCacheStartVersion(topVer);
 
-                    grpDesc.onCacheStopped(desc.cacheName(), desc.cacheId());
+                    ctx.discovery().clientCacheStartEvent(req.requestId(), F.asMap(cacheName, req), null);
 
-                    if (!grpDesc.hasCaches()) {
-                        registeredCacheGrps.remove(grpDesc.groupId());
+                    return true;
+                }
+            }
+        }
 
-                        ctx.discovery().removeCacheGroup(grpDesc);
+        return false;
+    }
 
-                        exchangeActions.addCacheGroupToStop(grpDesc, req.destroy());
+    /**
+     * @param exchangeActions Exchange actions to update.
+     * @param topVer Topology version.
+     * @param persistedCfgs {@code True} if process start of persisted caches during cluster activation.
+     * @param res Accumulator for cache change process results.
+     * @param req Cache change request.
+     * @param cacheName Cache name.
+     * @return True if there was no errors.
+     */
+    private boolean processStartNewCacheRequest(
+        ExchangeActions exchangeActions,
+        AffinityTopologyVersion topVer,
+        boolean persistedCfgs,
+        CacheChangeProcessResult res,
+        DynamicCacheChangeRequest req,
+        String cacheName
+    ) {
+        String conflictErr = checkCacheConflict(req.startCacheConfiguration());
 
-                        assert exchangeActions.checkStopRequestConsistency(grpDesc.groupId());
+        if (conflictErr != null) {
+            U.warn(log, "Ignore cache start request. " + conflictErr);
 
-                        // If all caches in group will be destroyed it is not necessary to destroy single cache
-                        // because group will be stopped anyway.
-                        if (req.destroy()) {
-                            for (ExchangeActions.CacheActionData action : exchangeActions.cacheStopRequests()) {
-                                if (action.descriptor().groupId() == grpDesc.groupId())
-                                    action.request().destroy(false);
-                            }
-                        }
-                    }
-                }
-            }
-            else
-                assert false : req;
+            IgniteCheckedException err = new IgniteCheckedException("Failed to start " +
+                "cache. " + conflictErr);
 
-            if (!needExchange) {
-                if (!clientCacheStart && ctx.localNodeId().equals(req.initiatingNodeId()))
-                    reqsToComplete.add(new T2<>(req, waitTopVer));
-            }
+            if (persistedCfgs)
+                res.errs.add(err);
             else
-                res.needExchange = true;
+                ctx.cache().completeCacheStartFuture(req, false, err);
+
+            return false;
         }
 
-        if (!F.isEmpty(res.addedDescs)) {
-            AffinityTopologyVersion startTopVer = res.needExchange ? topVer.nextMinorVersion() : topVer;
+        SchemaOperationException err = QueryUtils.checkQueryEntityConflicts(
+            req.startCacheConfiguration(), registeredCaches.values());
 
-            for (DynamicCacheDescriptor desc : res.addedDescs) {
-                assert desc.template() || res.needExchange;
+        if (err != null) {
+            if (persistedCfgs)
+                res.errs.add(err);
+            else
+                ctx.cache().completeCacheStartFuture(req, false, err);
 
-                desc.startTopologyVersion(startTopVer);
-            }
+            return false;
         }
 
-        if (!F.isEmpty(reqsToComplete)) {
-            ctx.closure().callLocalSafe(new Callable<Void>() {
-                @Override public Void call() throws Exception {
-                    for (T2<DynamicCacheChangeRequest, AffinityTopologyVersion> t : reqsToComplete) {
-                        final DynamicCacheChangeRequest req = t.get1();
-                        AffinityTopologyVersion waitTopVer = t.get2();
+        CacheConfiguration<?, ?> ccfg = req.startCacheConfiguration();
 
-                        IgniteInternalFuture<?> fut = waitTopVer != null ?
-                            ctx.cache().context().exchange().affinityReadyFuture(waitTopVer) : null;
+        assert req.cacheType() != null : req;
+        assert F.eq(ccfg.getName(), cacheName) : req;
 
-                        if (fut == null || fut.isDone())
-                            ctx.cache().completeCacheStartFuture(req, false, null);
-                        else {
-                            fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
-                                @Override public void apply(IgniteInternalFuture<?> fut) {
-                                    ctx.cache().completeCacheStartFuture(req, false, null);
-                                }
-                            });
-                        }
-                    }
+        int cacheId = CU.cacheId(cacheName);
 
-                    return null;
-                }
-            });
+        CacheGroupDescriptor grpDesc = registerCacheGroup(exchangeActions,
+            topVer,
+            ccfg,
+            cacheId,
+            req.initiatingNodeId(),
+            req.deploymentId(),
+            req.encryptionKey());
+
+        DynamicCacheDescriptor startDesc = new DynamicCacheDescriptor(ctx,
+            ccfg,
+            req.cacheType(),
+            grpDesc,
+            false,
+            req.initiatingNodeId(),
+            false,
+            req.sql(),
+            req.deploymentId(),
+            req.schema());
+
+        DynamicCacheDescriptor old = registeredCaches.put(ccfg.getName(), startDesc);
+
+        restartingCaches.remove(ccfg.getName());
+
+        assert old == null;
+
+        ctx.discovery().setCacheFilter(
+            startDesc.cacheId(),
+            grpDesc.groupId(),
+            ccfg.getName(),
+            ccfg.getNearConfiguration() != null);
+
+        if (!persistedCfgs) {
+            ctx.discovery().addClientNode(cacheName,
+                req.initiatingNodeId(),
+                req.nearCacheConfiguration() != null);
         }
 
-        return res;
+        res.addedDescs.add(startDesc);
+
+        exchangeActions.addCacheToStart(req, startDesc);
+
+        return true;
     }
 
     /**
@@ -779,7 +884,7 @@ class ClusterCachesInfo {
      * @return Collection of currently restarting caches.
      */
     Collection<String> restartingCaches() {
-        return restartingCaches;
+        return restartingCaches.keySet();
     }
 
     /**
@@ -987,7 +1092,7 @@ class ClusterCachesInfo {
             templates.put(desc.cacheName(), cacheData);
         }
 
-        Collection<String> restarting = new HashSet<>(restartingCaches);
+        Collection<String> restarting = new HashSet<>(restartingCaches.keySet());
 
         return new CacheNodeCommonDiscoveryData(caches,
             templates,
@@ -1360,7 +1465,8 @@ class ClusterCachesInfo {
      * @return Exchange action.
      * @throws IgniteCheckedException If configuration validation failed.
      */
-    public ExchangeActions onStateChangeRequest(ChangeGlobalStateMessage msg, AffinityTopologyVersion topVer, DiscoveryDataClusterState curState)
+    public ExchangeActions onStateChangeRequest(ChangeGlobalStateMessage msg, AffinityTopologyVersion topVer,
+        DiscoveryDataClusterState curState)
         throws IgniteCheckedException {
         ExchangeActions exchangeActions = new ExchangeActions();
 
@@ -1600,7 +1706,7 @@ class ClusterCachesInfo {
 
         //If conflict was detected we don't merge config and we leave existed config.
         if (!hasSchemaPatchConflict && !patchesToApply.isEmpty())
-            for(Map.Entry<DynamicCacheDescriptor, QuerySchemaPatch> entry: patchesToApply.entrySet()){
+            for (Map.Entry<DynamicCacheDescriptor, QuerySchemaPatch> entry : patchesToApply.entrySet()) {
                 if (entry.getKey().applySchemaPatch(entry.getValue()))
                     saveCacheConfiguration(entry.getKey());
             }
@@ -1761,7 +1867,8 @@ class ClusterCachesInfo {
         Integer cacheId,
         UUID rcvdFrom,
         IgniteUuid deploymentId,
-        @Nullable byte[] encKey) {
+        @Nullable byte[] encKey
+    ) {
         if (startedCacheCfg.getGroupName() != null) {
             CacheGroupDescriptor desc = cacheGroupByName(startedCacheCfg.getGroupName());
 
@@ -1814,7 +1921,8 @@ class ClusterCachesInfo {
      * @param exchActions Optional exchange actions to update if new group was added.
      * @param startedCacheCfg Started cache configuration.
      */
-    private boolean resolvePersistentFlag(@Nullable ExchangeActions exchActions, CacheConfiguration<?, ?> startedCacheCfg) {
+    private boolean resolvePersistentFlag(@Nullable ExchangeActions exchActions,
+        CacheConfiguration<?, ?> startedCacheCfg) {
         if (!ctx.clientNode()) {
             // On server, we always can determine whether cache is persistent by local storage configuration.
             return CU.isPersistentCache(startedCacheCfg, ctx.config().getDataStorageConfiguration());
@@ -1961,6 +2069,7 @@ class ClusterCachesInfo {
 
     /**
      * Returns registered cache descriptors ordered by {@code comparator}
+     *
      * @param comparator Comparator (DIRECT, REVERSE or custom) to order cache descriptors.
      * @return Ordered by comparator cache descriptors.
      */
@@ -2106,6 +2215,28 @@ class ClusterCachesInfo {
     }
 
     /**
+     * @param cacheName Cache name.
+     * @return {@code True} if cache is restarting.
+     */
+    public boolean isRestarting(String cacheName) {
+        return restartingCaches.containsKey(cacheName);
+    }
+
+    /**
+     * @param cacheName Cache name which restart were cancelled.
+     */
+    public void removeRestartingCache(String cacheName) {
+        restartingCaches.remove(cacheName);
+    }
+
+    /**
+     * Clear up information about restarting caches.
+     */
+    public void removeRestartingCaches() {
+        restartingCaches.clear();
+    }
+
+    /**
      * Holds direct comparator (first system caches) and reverse comparator (first user caches).
      * Use DIRECT comparator for ordering cache start operations.
      * Use REVERSE comparator for ordering cache stop operations.
index 5b8a89c..8128230 100644 (file)
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.io.Serializable;
+import java.util.UUID;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
@@ -25,9 +27,6 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
-import java.io.Serializable;
-import java.util.UUID;
-
 /**
  * Cache start/stop request.
  */
@@ -68,6 +67,9 @@ public class DynamicCacheChangeRequest implements Serializable {
     /** Restart flag. */
     private boolean restart;
 
+    /** Restart operation id. */
+    private IgniteUuid restartId;
+
     /** Cache active on start or not*/
     private boolean disabledAfterStart;
 
@@ -265,6 +267,20 @@ public class DynamicCacheChangeRequest implements Serializable {
     }
 
     /**
+     * @return Id of restart to allow only initiator start the restarting cache.
+     */
+    public IgniteUuid restartId() {
+        return restartId;
+    }
+
+    /**
+     * @param restartId Id of cache restart requester.
+     */
+    public void restartId(IgniteUuid restartId) {
+        this.restartId = restartId;
+    }
+
+    /**
      * @return Cache name.
      */
     public String cacheName() {
index 0735a88..ef861b9 100644 (file)
@@ -49,10 +49,8 @@ import org.apache.ignite.cache.query.QueryMetrics;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.internal.AsyncSupportAdapter;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
 import org.apache.ignite.internal.GridKernalState;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteClosure;
@@ -1565,12 +1563,7 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite
             try {
                 IgniteInternalCache<K, V> cache = context().kernalContext().cache().<K, V>publicJCache(context().name()).internalProxy();
 
-                GridFutureAdapter<Void> fut = proxyImpl.opportunisticRestart();
-
-                if (fut == null)
-                    proxyImpl.onRestarted(cache.context(), cache.context().cache());
-                else
-                    new IgniteFutureImpl<>(fut).get();
+                proxyImpl.opportunisticRestart(cache);
 
                 return gate();
             } catch (IgniteCheckedException ice) {
@@ -1587,8 +1580,18 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite
     private CacheOperationGate onEnter() {
         GridCacheGateway<K, V> gate = checkProxyIsValid(gate(), true);
 
-        return new CacheOperationGate(gate,
-            lock ? gate.enter(opCtx) : gate.enterNoLock(opCtx));
+        try {
+            return new CacheOperationGate(gate,
+                lock ? gate.enter(opCtx) : gate.enterNoLock(opCtx));
+        }
+        catch (IllegalStateException e) {
+            boolean isCacheProxy = delegate instanceof IgniteCacheProxyImpl;
+
+            if (isCacheProxy)
+                ((IgniteCacheProxyImpl) delegate).checkRestart(true);
+
+            throw e; // If we reached this line.
+        }
     }
 
     /**
index a1c403b..f0e6cd3 100644 (file)
@@ -49,6 +49,7 @@ import javax.cache.processor.EntryProcessorException;
 import javax.cache.processor.EntryProcessorResult;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCacheRestartingException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
index de1054b..d2304d4 100644 (file)
@@ -134,8 +134,8 @@ import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
 import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.PARTIAL_COUNTERS_MAP_SINCE;
 import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.PARTIAL_COUNTERS_MAP_SINCE;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.nextDumpTimeout;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.DFLT_PRELOAD_RESEND_TIMEOUT;
 
@@ -1894,8 +1894,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         dumpPendingObjects(exchTopVer, diagCtx);
 
-        for (CacheGroupContext grp : cctx.cache().cacheGroups())
-            grp.preloader().dumpDebugInfo();
+        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+            GridCachePreloader preloader = grp.preloader();
+
+            if (preloader != null)
+                preloader.dumpDebugInfo();
+        }
 
         cctx.affinity().dumpDebugInfo();
 
index 1c44eaf..dadd719 100644 (file)
@@ -1120,8 +1120,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * Blocks all available gateways
      */
     public void blockGateways() {
-        for (IgniteCacheProxy<?, ?> proxy : jCacheProxies.values())
-            proxy.context().gate().onStopped();
+        for (IgniteCacheProxyImpl<?, ?> proxy : jCacheProxies.values())
+            proxy.context0().gate().onStopped();
     }
 
     /** {@inheritDoc} */
@@ -1861,7 +1861,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                     proxy.onRestarted(cacheCtx, cache);
 
                     if (cacheCtx.dataStructuresCache())
-                        ctx.dataStructures().restart(proxy.internalProxy());
+                        ctx.dataStructures().restart(cache.name(), proxy.internalProxy());
                 }
             }
         }
@@ -2662,12 +2662,44 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 proxy.onRestarted(cacheCtx, cacheCtx.cache());
 
                 if (cacheCtx.dataStructuresCache())
-                    ctx.dataStructures().restart(proxy.internalProxy());
+                    ctx.dataStructures().restart(proxy.getName(), proxy.internalProxy());
             }
         }
     }
 
     /**
+     * Complete stopping of caches if they were marked as restarting but it failed.
+     * @return Cache names of proxies which were restarted.
+     */
+    public List<String> resetRestartingProxies() {
+        List<String> res = new ArrayList<>();
+
+        for (Map.Entry<String, IgniteCacheProxyImpl<?, ?>> e : jCacheProxies.entrySet()) {
+            IgniteCacheProxyImpl<?, ?> proxy = e.getValue();
+
+            if (proxy == null)
+                continue;
+
+            if (proxy.isRestarting()) {
+                String cacheName = e.getKey();
+
+                res.add(cacheName);
+
+                jCacheProxies.remove(cacheName);
+
+                proxy.onRestarted(null, null);
+
+                if (DataStructuresProcessor.isDataStructureCache(cacheName))
+                    ctx.dataStructures().restart(cacheName, null);
+            }
+        }
+
+        cachesInfo.removeRestartingCaches();
+
+        return res;
+    }
+
+    /**
      * @param desc Group descriptor.
      * @param cacheType Cache type.
      * @param affNode Affinity node flag.
@@ -2751,16 +2783,28 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 cache.active(false);
         }
 
-        if (proxy != null) {
-            if (stop) {
-                if (restart)
-                    proxy.restart();
+        if (stop) {
+            if (restart) {
+                GridCacheAdapter<?, ?> cache;
 
-                proxy.context().gate().stopped();
+                if (proxy == null && (cache = caches.get(cacheName)) != null) {
+                    proxy = new IgniteCacheProxyImpl(cache.context(), cache, false);
+
+                    IgniteCacheProxyImpl<?, ?> oldProxy = jCacheProxies.putIfAbsent(cacheName, proxy);
+
+                    if (oldProxy != null)
+                        proxy = oldProxy;
+                }
+
+                if (proxy != null)
+                    proxy.suspend();
             }
-            else
-                proxy.closeProxy();
+
+            if (proxy != null)
+                proxy.context0().gate().stopped();
         }
+        else if (proxy != null)
+            proxy.closeProxy();
     }
 
     /**
@@ -2784,7 +2828,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             proxy = jCacheProxies.get(req.cacheName());
 
             if (proxy != null)
-                proxy.restart();
+                proxy.suspend();
         }
         else {
             completeProxyInitialize(req.cacheName());
@@ -2793,7 +2837,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         }
 
         if (proxy != null)
-            proxy.context().gate().onStopped();
+            proxy.context0().gate().onStopped();
     }
 
     /**
@@ -2837,7 +2881,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                     IgniteCacheProxyImpl<?, ?> newProxy = new IgniteCacheProxyImpl(cache.context(), cache, false);
 
                     if (!cache.active())
-                        newProxy.restart();
+                        newProxy.suspend();
 
                     addjCacheProxy(cacheCtx.name(), newProxy);
                 }
@@ -3708,6 +3752,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 sql,
                 failIfExists,
                 failIfNotStarted,
+                null,
                 false,
                 null,
                 ccfg != null && ccfg.isEncryptionEnabled() ? grpKeys.iterator().next() : null);
@@ -3812,15 +3857,18 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param disabledAfterStart If true, cache proxies will be only activated after {@link #restartProxies()}.
      * @return Future that will be completed when all caches are deployed.
      */
-    public IgniteInternalFuture<Boolean> dynamicStartCaches(Collection<CacheConfiguration> ccfgList,
+    public IgniteInternalFuture<Boolean> dynamicStartCaches(
+        Collection<CacheConfiguration> ccfgList,
         boolean failIfExists,
-        boolean checkThreadTx, boolean disabledAfterStart) {
+        boolean checkThreadTx,
+        boolean disabledAfterStart
+    ) {
         return dynamicStartCachesByStoredConf(
             ccfgList.stream().map(StoredCacheData::new).collect(Collectors.toList()),
             failIfExists,
             checkThreadTx,
-            disabledAfterStart
-        );
+            disabledAfterStart,
+            null);
     }
 
     /**
@@ -3830,13 +3878,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param failIfExists Fail if exists flag.
      * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
      * @param disabledAfterStart If true, cache proxies will be only activated after {@link #restartProxies()}.
+     * @param restartId Restart requester id (it'll allow to start this cache only him).
      * @return Future that will be completed when all caches are deployed.
      */
     public IgniteInternalFuture<Boolean> dynamicStartCachesByStoredConf(
         Collection<StoredCacheData> storedCacheDataList,
         boolean failIfExists,
         boolean checkThreadTx,
-        boolean disabledAfterStart) {
+        boolean disabledAfterStart,
+        IgniteUuid restartId
+    ) {
         if (checkThreadTx)
             checkEmptyTransactions();
 
@@ -3857,6 +3908,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                     ccfg.sql(),
                     failIfExists,
                     true,
+                    restartId,
                     disabledAfterStart,
                     ccfg.queryEntities(),
                     ccfg.config().isEncryptionEnabled() ? grpKeysIter.next() : null);
@@ -3927,10 +3979,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * command.
      * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
      * @param restart Restart flag.
+     * @param restartId Restart requester id (it'll allow to start this cache only him).
      * @return Future that will be completed when cache is destroyed.
      */
-    public IgniteInternalFuture<Boolean> dynamicDestroyCache(String cacheName, boolean sql, boolean checkThreadTx,
-        boolean restart) {
+    public IgniteInternalFuture<Boolean> dynamicDestroyCache(
+        String cacheName,
+        boolean sql,
+        boolean checkThreadTx,
+        boolean restart,
+        IgniteUuid restartId
+    ) {
         assert cacheName != null;
 
         if (checkThreadTx)
@@ -3941,6 +3999,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         req.stop(true);
         req.destroy(true);
         req.restart(restart);
+        req.restartId(restartId);
 
         return F.first(initiateCacheChanges(F.asList(req)));
     }
@@ -3948,30 +4007,30 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /**
      * @param cacheNames Collection of cache names to destroy.
      * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
-     * @param restart Restart flag.
      * @return Future that will be completed when cache is destroyed.
      */
-    public IgniteInternalFuture<?> dynamicDestroyCaches(Collection<String> cacheNames, boolean checkThreadTx,
-        boolean restart) {
-        return dynamicDestroyCaches(cacheNames, checkThreadTx, restart, true);
+    public IgniteInternalFuture<?> dynamicDestroyCaches(Collection<String> cacheNames, boolean checkThreadTx) {
+        return dynamicDestroyCaches(cacheNames, checkThreadTx, true);
     }
 
     /**
      * @param cacheNames Collection of cache names to destroy.
      * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
-     * @param restart Restart flag.
      * @param destroy Cache data destroy flag. Setting to <code>true</code> will cause removing all cache data
      * @return Future that will be completed when cache is destroyed.
      */
-    public IgniteInternalFuture<?> dynamicDestroyCaches(Collection<String> cacheNames, boolean checkThreadTx,
-        boolean restart, boolean destroy) {
+    public IgniteInternalFuture<?> dynamicDestroyCaches(
+        Collection<String> cacheNames,
+        boolean checkThreadTx,
+        boolean destroy
+    ) {
         if (checkThreadTx)
             checkEmptyTransactions();
 
         List<DynamicCacheChangeRequest> reqs = new ArrayList<>(cacheNames.size());
 
         for (String cacheName : cacheNames) {
-            reqs.add(createStopRequest(cacheName, restart, destroy));
+            reqs.add(createStopRequest(cacheName, false, null, destroy));
         }
 
         return dynamicChangeCaches(reqs);
@@ -3982,15 +4041,17 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      *
      * @param cacheName Cache names to destroy.
      * @param restart Restart flag.
+     * @param restartId Restart requester id (it'll allow to start this cache only him).
      * @param destroy Cache data destroy flag. Setting to {@code true} will cause removing all cache data from store.
      * @return Future that will be completed when cache is destroyed.
      */
-    @NotNull public DynamicCacheChangeRequest createStopRequest(String cacheName, boolean restart, boolean destroy) {
+    @NotNull public DynamicCacheChangeRequest createStopRequest(String cacheName, boolean restart, IgniteUuid restartId, boolean destroy) {
         DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx, cacheName, false, true);
 
         req.stop(true);
         req.destroy(destroy);
         req.restart(restart);
+        req.restartId(restartId);
 
         return req;
     }
@@ -4053,7 +4114,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         checkEmptyTransactions();
 
         if (proxy.context().isLocal())
-            return dynamicDestroyCache(cacheName, false, true, false);
+            return dynamicDestroyCache(cacheName, false, true, false, null);
 
         return startClientCacheChange(null, Collections.singleton(cacheName));
     }
@@ -4375,10 +4436,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Reset restarting caches.
+     * @param cacheName Cache to check.
+     * @return Cache is under restarting.
      */
-    public void resetRestartingCaches() {
-        cachesInfo.restartingCaches().clear();
+    public boolean isCacheRestarting(String cacheName) {
+        return cachesInfo.isRestarting(cacheName);
     }
 
     /**
@@ -4790,8 +4852,20 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         DynamicCacheDescriptor desc = cacheDescriptor(name);
 
-        if (desc == null)
+        if (desc == null) {
+            if (cachesInfo.isRestarting(name)) {
+                IgniteCacheProxyImpl<?, ?> proxy = jCacheProxies.get(name);
+
+                assert proxy != null: name;
+
+                proxy.internalProxy(); //should throw exception
+
+                // we have procceed, try again
+                return cacheConfiguration(name);
+            }
+
             throw new IllegalStateException("Cache doesn't exist: " + name);
+        }
         else
             return desc.cacheConfiguration();
     }
@@ -5264,6 +5338,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param sql Whether the cache needs to be created as the result of SQL {@code CREATE TABLE} command.
      * @param failIfExists Fail if exists flag.
      * @param failIfNotStarted If {@code true} fails if cache is not started.
+     * @param restartId Restart requester id (it'll allow to start this cache only him).
      * @param disabledAfterStart If true, cache proxies will be only activated after {@link #restartProxies()}.
      * @param qryEntities Query entities.
      * @param encKey Encryption key.
@@ -5279,6 +5354,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         boolean sql,
         boolean failIfExists,
         boolean failIfNotStarted,
+        IgniteUuid restartId,
         boolean disabledAfterStart,
         @Nullable Collection<QueryEntity> qryEntities,
         @Nullable byte[] encKey
@@ -5295,6 +5371,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         req.encryptionKey(encKey);
 
+        req.restartId(restartId);
+
         if (ccfg != null) {
             cloneCheckSerializable(ccfg);
 
@@ -5340,7 +5418,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 initialize(cfg, cacheObjCtx);
 
                 if (cachesInfo.restartingCaches().contains(req.cacheName()))
-                    req.schema(new QuerySchema(qryEntities));
+                    req.schema(new QuerySchema(qryEntities == null? cfg.getQueryEntities() : qryEntities));
                 else
                     req.schema(new QuerySchema(qryEntities != null ? QueryUtils.normalizeQueryEntities(qryEntities, cfg)
                             : cfg.getQueryEntities()));
index a6a1fc1..24b7f16 100644 (file)
@@ -31,6 +31,7 @@ import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import javax.cache.Cache;
@@ -100,7 +101,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.mxbean.CacheMetricsMXBean;
 import org.apache.ignite.plugin.security.SecurityPermission;
@@ -122,9 +122,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
     private static final IgniteProductVersion CONT_QRY_WITH_TRANSFORMER_SINCE =
         IgniteProductVersion.fromString("2.5.0");
 
+    /** Cache name. */
+    private String cacheName;
+
     /** Context. */
     private volatile GridCacheContext<K, V> ctx;
 
+    /** Old context. */
+    private transient volatile GridCacheContext<K, V> oldContext;
+
     /** Delegate. */
     @GridToStringInclude
     private volatile IgniteInternalCache<K, V> delegate;
@@ -185,6 +191,10 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
         assert ctx != null;
         assert delegate != null;
 
+        cacheName = ctx.name();
+
+        assert cacheName.equals(delegate.name()) : "ctx.name=" + cacheName + ", delegate.name=" + delegate.name();
+
         this.ctx = ctx;
         this.delegate = delegate;
 
@@ -203,6 +213,65 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
      * @return Context.
      */
     @Override public GridCacheContext<K, V> context() {
+        return getContextSafe();
+    }
+
+    /**
+     * @return Context or throw restart exception.
+     */
+    private GridCacheContext<K, V> getContextSafe() {
+        while (true) {
+            GridCacheContext<K, V> ctx = this.ctx;
+
+            if (ctx == null) {
+                checkRestart();
+
+                if (Thread.currentThread().isInterrupted())
+                    throw new IgniteException(new InterruptedException());
+            }
+            else
+                return ctx;
+        }
+    }
+
+    /**
+     * @return Delegate or throw restart exception.
+     */
+    private IgniteInternalCache<K, V> getDelegateSafe() {
+        while (true) {
+            IgniteInternalCache<K, V> delegate = this.delegate;
+
+            if (delegate == null) {
+                checkRestart();
+
+                if (Thread.currentThread().isInterrupted())
+                    throw new IgniteException(new InterruptedException());
+            }
+            else
+                return delegate;
+        }
+    }
+
+    /**
+     * @return Context.
+     */
+    public GridCacheContext<K, V> context0() {
+        GridCacheContext<K, V> ctx = this.ctx;
+
+        if (ctx == null) {
+            synchronized (this) {
+                ctx = this.ctx;
+
+                if (ctx == null) {
+                    GridCacheContext<K, V> context = oldContext;
+
+                    assert context != null;
+
+                    return context;
+                }
+            }
+        }
+
         return ctx;
     }
 
@@ -219,36 +288,49 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
             return cachedProxy;
 
         cachedProxy = new GatewayProtectedCacheProxy<>(this, new CacheOperationContext(), true);
+
         return cachedProxy;
     }
 
     /** {@inheritDoc} */
     @Override public CacheMetrics metrics() {
+        GridCacheContext<K, V> ctx = getContextSafe();
+
         return ctx.cache().clusterMetrics();
     }
 
     /** {@inheritDoc} */
     @Override public CacheMetrics metrics(ClusterGroup grp) {
+        GridCacheContext<K, V> ctx = getContextSafe();
+
         return ctx.cache().clusterMetrics(grp);
     }
 
     /** {@inheritDoc} */
     @Override public CacheMetrics localMetrics() {
+        GridCacheContext<K, V> ctx = getContextSafe();
+
         return ctx.cache().localMetrics();
     }
 
     /** {@inheritDoc} */
     @Override public CacheMetricsMXBean mxBean() {
+        GridCacheContext<K, V> ctx = getContextSafe();
+
         return ctx.cache().clusterMxBean();
     }
 
     /** {@inheritDoc} */
     @Override public CacheMetricsMXBean localMxBean() {
+        GridCacheContext<K, V> ctx = getContextSafe();
+
         return ctx.cache().localMxBean();
     }
 
     /** {@inheritDoc} */
     @Override public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) {
+        GridCacheContext<K, V> ctx = getContextSafe();
+
         CacheConfiguration cfg = ctx.config();
 
         if (!clazz.isAssignableFrom(cfg.getClass()))
@@ -284,6 +366,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) {
+        GridCacheContext<K, V> ctx = getContextSafe();
+
         try {
             if (isAsync()) {
                 if (ctx.cache().isLocal())
@@ -306,6 +390,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
     /** {@inheritDoc} */
     @Override public IgniteFuture<Void> loadCacheAsync(@Nullable IgniteBiPredicate<K, V> p,
         @Nullable Object... args) throws CacheException {
+        GridCacheContext<K, V> ctx = getContextSafe();
+
         try {
             if (ctx.cache().isLocal())
                 return (IgniteFuture<Void>)createFuture(ctx.cache().localLoadCacheAsync(p, args));
@@ -319,6 +405,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync())
                 setFuture(delegate.localLoadCacheAsync(p, args));
@@ -333,11 +421,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
     /** {@inheritDoc} */
     @Override public IgniteFuture<Void> localLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p,
         @Nullable Object... args) throws CacheException {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return (IgniteFuture<Void>)createFuture(delegate.localLoadCacheAsync(p, args));
     }
 
     /** {@inheritDoc} */
     @Nullable @Override public V getAndPutIfAbsent(K key, V val) throws CacheException {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync()) {
                 setFuture(delegate.getAndPutIfAbsentAsync(key, val));
@@ -354,6 +446,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<V> getAndPutIfAbsentAsync(K key, V val) throws CacheException {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return createFuture(delegate.getAndPutIfAbsentAsync(key, val));
     }
 
@@ -364,6 +458,9 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public Lock lockAll(final Collection<? extends K> keys) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+        GridCacheContext<K, V> ctx = getContextSafe();
+
         //TODO: IGNITE-9324: add explicit locks support.
         MvccUtils.verifyMvccOperationSupport(ctx, "Lock");
 
@@ -372,6 +469,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public boolean isLocalLocked(K key, boolean byCurrThread) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return byCurrThread ? delegate.isLockedByThread(key) : delegate.isLocked(key);
     }
 
@@ -386,8 +485,9 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
     private <T, R> QueryCursor<R> query(
         final ScanQuery scanQry,
         @Nullable final IgniteClosure<T, R> transformer,
-        @Nullable ClusterGroup grp)
-        throws IgniteCheckedException {
+        @Nullable ClusterGroup grp
+    ) throws IgniteCheckedException {
+        GridCacheContext<K, V> ctx = getContextSafe();
 
         CacheOperationContext opCtxCall = ctx.operationContextPerCall();
 
@@ -405,7 +505,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
             qry.projection(grp);
 
         final GridCloseableIterator<R> iter = ctx.kernalContext().query().executeQuery(GridCacheQueryType.SCAN,
-            ctx.name(), ctx, new IgniteOutClosureX<GridCloseableIterator<R>>() {
+            cacheName, ctx, new IgniteOutClosureX<GridCloseableIterator<R>>() {
                 @Override public GridCloseableIterator<R> applyx() throws IgniteCheckedException {
                     return qry.executeScanQuery();
                 }
@@ -423,6 +523,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
     @SuppressWarnings("unchecked")
     private QueryCursor<Cache.Entry<K, V>> query(final Query filter, @Nullable ClusterGroup grp)
         throws IgniteCheckedException {
+        GridCacheContext<K, V> ctx = getContextSafe();
+
         final CacheQuery qry;
 
         CacheOperationContext opCtxCall = ctx.operationContextPerCall();
@@ -515,11 +617,13 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
      * @return Local node cluster group.
      */
     private ClusterGroup projection(boolean loc) {
+        GridCacheContext<K, V> ctx = getContextSafe();
+
         if (loc || ctx.isLocal() || ctx.isReplicatedAffinityNode())
             return ctx.kernalContext().grid().cluster().forLocal();
 
         if (ctx.isReplicated())
-            return ctx.kernalContext().grid().cluster().forDataNodes(ctx.name()).forRandom();
+            return ctx.kernalContext().grid().cluster().forDataNodes(cacheName).forRandom();
 
         return null;
     }
@@ -534,6 +638,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
      */
     @SuppressWarnings("unchecked")
     private QueryCursor<Cache.Entry<K, V>> queryContinuous(AbstractContinuousQuery qry, boolean loc, boolean keepBinary) {
+        GridCacheContext<K, V> ctx = getContextSafe();
+
         assert qry instanceof ContinuousQuery || qry instanceof ContinuousQueryWithTransformer;
 
         if (qry.getInitialQuery() instanceof ContinuousQuery ||
@@ -638,6 +744,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public List<FieldsQueryCursor<List<?>>> queryMultipleStatements(SqlFieldsQuery qry) {
+        GridCacheContext<K, V> ctx = getContextSafe();
+
         A.notNull(qry, "qry");
         try {
             ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -662,6 +770,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public <R> QueryCursor<R> query(Query<R> qry) {
+        GridCacheContext<K, V> ctx = getContextSafe();
+
         A.notNull(qry, "qry");
         try {
             ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -699,6 +809,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public <T, R> QueryCursor<R> query(Query<T> qry, IgniteClosure<T, R> transformer) {
+        GridCacheContext<K, V> ctx = getContextSafe();
+
         A.notNull(qry, "qry");
         A.notNull(transformer, "transformer");
 
@@ -726,6 +838,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
      * @param qry Query.
      */
     private void convertToBinary(final Query qry) {
+        GridCacheContext<K, V> ctx = getContextSafe();
+
         if (ctx.binaryMarshaller()) {
             if (qry instanceof SqlQuery) {
                 final SqlQuery sqlQry = (SqlQuery) qry;
@@ -754,6 +868,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
         if (args == null)
             return;
 
+        GridCacheContext<K, V> ctx = getContextSafe();
+
         for (int i = 0; i < args.length; i++)
             args[i] = ctx.cacheObjects().binary().toBinary(args[i]);
     }
@@ -765,10 +881,12 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
      * @throws CacheException If query indexing disabled for sql query.
      */
     private void validate(Query qry) {
+        GridCacheContext<K, V> ctx = getContextSafe();
+
         if (!QueryUtils.isEnabled(ctx.config()) && !(qry instanceof ScanQuery) &&
             !(qry instanceof ContinuousQuery) && !(qry instanceof ContinuousQueryWithTransformer) &&
             !(qry instanceof SpiQuery) && !(qry instanceof SqlQuery) && !(qry instanceof SqlFieldsQuery))
-            throw new CacheException("Indexing is disabled for cache: " + ctx.cache().name() +
+            throw new CacheException("Indexing is disabled for cache: " + cacheName +
                     ". Use setIndexedTypes or setTypeMetadata methods on CacheConfiguration to enable.");
 
         if (!ctx.kernalContext().query().moduleEnabled() &&
@@ -779,6 +897,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public Iterable<Cache.Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             return delegate.localEntries(peekModes);
         }
@@ -789,31 +909,43 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public QueryMetrics queryMetrics() {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return delegate.context().queries().metrics();
     }
 
     /** {@inheritDoc} */
     @Override public void resetQueryMetrics() {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         delegate.context().queries().resetMetrics();
     }
 
     /** {@inheritDoc} */
     @Override public Collection<? extends QueryDetailMetrics> queryDetailMetrics() {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return delegate.context().queries().detailMetrics();
     }
 
     /** {@inheritDoc} */
     @Override public void resetQueryDetailMetrics() {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         delegate.context().queries().resetDetailMetrics();
     }
 
     /** {@inheritDoc} */
     @Override public void localEvict(Collection<? extends K> keys) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         delegate.evictAll(keys);
     }
 
     /** {@inheritDoc} */
     @Nullable @Override public V localPeek(K key, CachePeekMode... peekModes) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             return delegate.localPeek(key, peekModes);
         }
@@ -824,6 +956,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public int size(CachePeekMode... peekModes) throws CacheException {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync()) {
                 setFuture(delegate.sizeAsync(peekModes));
@@ -840,11 +974,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<Integer> sizeAsync(CachePeekMode... peekModes) throws CacheException {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return createFuture(delegate.sizeAsync(peekModes));
     }
 
     /** {@inheritDoc} */
     @Override public long sizeLong(CachePeekMode... peekModes) throws CacheException {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync()) {
                 setFuture(delegate.sizeLongAsync(peekModes));
@@ -861,11 +999,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<Long> sizeLongAsync(CachePeekMode... peekModes) throws CacheException {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return createFuture(delegate.sizeLongAsync(peekModes));
     }
 
     /** {@inheritDoc} */
     @Override public long sizeLong(int part, CachePeekMode... peekModes) throws CacheException {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync()) {
                 setFuture(delegate.sizeLongAsync(part, peekModes));
@@ -882,11 +1024,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<Long> sizeLongAsync(int part, CachePeekMode... peekModes) throws CacheException {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return createFuture(delegate.sizeLongAsync(part, peekModes));
     }
 
     /** {@inheritDoc} */
     @Override public int localSize(CachePeekMode... peekModes) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             return delegate.localSize(peekModes);
         }
@@ -897,6 +1043,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public long localSizeLong(CachePeekMode... peekModes) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             return delegate.localSizeLong(peekModes);
         }
@@ -907,6 +1055,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public long localSizeLong(int part, CachePeekMode... peekModes) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             return delegate.localSizeLong(part, peekModes);
         }
@@ -917,6 +1067,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public V get(K key) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync()) {
                 setFuture(delegate.getAsync(key));
@@ -933,11 +1085,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<V> getAsync(K key) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return createFuture(delegate.getAsync(key));
     }
 
     /** {@inheritDoc} */
     @Override public CacheEntry<K, V> getEntry(K key) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync()) {
                 setFuture(delegate.getEntryAsync(key));
@@ -954,11 +1110,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<CacheEntry<K, V>> getEntryAsync(K key) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return createFuture(delegate.getEntryAsync(key));
     }
 
     /** {@inheritDoc} */
     @Override public Map<K, V> getAll(Set<? extends K> keys) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync()) {
                 setFuture(delegate.getAllAsync(keys));
@@ -975,11 +1135,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<Map<K, V>> getAllAsync(Set<? extends K> keys) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return createFuture(delegate.getAllAsync(keys));
     }
 
     /** {@inheritDoc} */
     @Override public Collection<CacheEntry<K, V>> getEntries(Set<? extends K> keys) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync()) {
                 setFuture(delegate.getEntriesAsync(keys));
@@ -996,11 +1160,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<Collection<CacheEntry<K, V>>> getEntriesAsync(Set<? extends K> keys) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return createFuture(delegate.getEntriesAsync(keys));
     }
 
     /** {@inheritDoc} */
     @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync()) {
                 setFuture(delegate.getAllOutTxAsync(keys));
@@ -1017,6 +1185,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<Map<K, V>> getAllOutTxAsync(Set<? extends K> keys) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return createFuture(delegate.getAllOutTxAsync(keys));
     }
 
@@ -1025,6 +1195,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
      * @return Values map.
      */
     public Map<K, V> getAll(Collection<? extends K> keys) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync()) {
                 setFuture(delegate.getAllAsync(keys));
@@ -1041,6 +1213,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public boolean containsKey(K key) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         if (isAsync()) {
             setFuture(delegate.containsKeyAsync(key));
 
@@ -1052,11 +1226,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<Boolean> containsKeyAsync(K key) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return createFuture(delegate.containsKeyAsync(key));
     }
 
     /** {@inheritDoc} */
     @Override public boolean containsKeys(Set<? extends K> keys) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         if (isAsync()) {
             setFuture(delegate.containsKeysAsync(keys));
 
@@ -1068,6 +1246,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<Boolean> containsKeysAsync(Set<? extends K> keys) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return createFuture(delegate.containsKeysAsync(keys));
     }
 
@@ -1077,6 +1257,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
         boolean replaceExisting,
         @Nullable final CompletionListener completionLsnr
     ) {
+        GridCacheContext<K, V> ctx = getContextSafe();
+
         IgniteInternalFuture<?> fut = ctx.cache().loadAll(keys, replaceExisting);
 
         if (completionLsnr != null) {
@@ -1097,6 +1279,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public void put(K key, V val) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync())
                 setFuture(putAsync0(key, val));
@@ -1121,6 +1305,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
      * @return Internal future.
      */
     private IgniteInternalFuture<Void> putAsync0(K key, V val) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         IgniteInternalFuture<Boolean> fut = delegate.putAsync(key, val);
 
         return fut.chain(new CX1<IgniteInternalFuture<Boolean>, Void>() {
@@ -1139,6 +1325,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public V getAndPut(K key, V val) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync()) {
                 setFuture(delegate.getAndPutAsync(key, val));
@@ -1155,11 +1343,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<V> getAndPutAsync(K key, V val) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return createFuture(delegate.getAndPutAsync(key, val));
     }
 
     /** {@inheritDoc} */
     @Override public void putAll(Map<? extends K, ? extends V> map) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync())
                 setFuture(delegate.putAllAsync(map));
@@ -1173,11 +1365,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<Void> putAllAsync(Map<? extends K, ? extends V> map) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return (IgniteFuture<Void>)createFuture(delegate.putAllAsync(map));
     }
 
     /** {@inheritDoc} */
     @Override public boolean putIfAbsent(K key, V val) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync()) {
                 setFuture(delegate.putIfAbsentAsync(key, val));
@@ -1194,11 +1390,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<Boolean> putIfAbsentAsync(K key, V val) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return createFuture(delegate.putIfAbsentAsync(key, val));
     }
 
     /** {@inheritDoc} */
     @Override public boolean remove(K key) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync()) {
                 setFuture(delegate.removeAsync(key));
@@ -1215,11 +1415,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<Boolean> removeAsync(K key) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return createFuture(delegate.removeAsync(key));
     }
 
     /** {@inheritDoc} */
     @Override public boolean remove(K key, V oldVal) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync()) {
                 setFuture(delegate.removeAsync(key, oldVal));
@@ -1236,11 +1440,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<Boolean> removeAsync(K key, V oldVal) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return createFuture(delegate.removeAsync(key, oldVal));
     }
 
     /** {@inheritDoc} */
     @Override public V getAndRemove(K key) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync()) {
                 setFuture(delegate.getAndRemoveAsync(key));
@@ -1257,11 +1465,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<V> getAndRemoveAsync(K key) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return createFuture(delegate.getAndRemoveAsync(key));
     }
 
     /** {@inheritDoc} */
     @Override public boolean replace(K key, V oldVal, V newVal) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync()) {
                 setFuture(delegate.replaceAsync(key, oldVal, newVal));
@@ -1278,11 +1490,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return createFuture(delegate.replaceAsync(key, oldVal, newVal));
     }
 
     /** {@inheritDoc} */
     @Override public boolean replace(K key, V val) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync()) {
                 setFuture(delegate.replaceAsync(key, val));
@@ -1299,11 +1515,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<Boolean> replaceAsync(K key, V val) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return createFuture(delegate.replaceAsync(key, val));
     }
 
     /** {@inheritDoc} */
     @Override public V getAndReplace(K key, V val) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync()) {
                 setFuture(delegate.getAndReplaceAsync(key, val));
@@ -1320,11 +1540,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<V> getAndReplaceAsync(K key, V val) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return createFuture(delegate.getAndReplaceAsync(key, val));
     }
 
     /** {@inheritDoc} */
     @Override public void removeAll(Set<? extends K> keys) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync())
                 setFuture(delegate.removeAllAsync(keys));
@@ -1338,11 +1562,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<Void> removeAllAsync(Set<? extends K> keys) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return (IgniteFuture<Void>)createFuture(delegate.removeAllAsync(keys));
     }
 
     /** {@inheritDoc} */
     @Override public void removeAll() {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync())
                 setFuture(delegate.removeAllAsync());
@@ -1356,11 +1584,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<Void> removeAllAsync() {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return (IgniteFuture<Void>)createFuture(delegate.removeAllAsync());
     }
 
     /** {@inheritDoc} */
     @Override public void clear(K key) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync())
                 setFuture(delegate.clearAsync(key));
@@ -1374,11 +1606,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<Void> clearAsync(K key) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return (IgniteFuture<Void>)createFuture(delegate.clearAsync(key));
     }
 
     /** {@inheritDoc} */
     @Override public void clearAll(Set<? extends K> keys) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync())
                 setFuture(delegate.clearAllAsync(keys));
@@ -1392,11 +1628,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<Void> clearAllAsync(Set<? extends K> keys) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return (IgniteFuture<Void>)createFuture(delegate.clearAllAsync(keys));
     }
 
     /** {@inheritDoc} */
     @Override public void clear() {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync())
                 setFuture(delegate.clearAsync());
@@ -1410,16 +1650,22 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<Void> clearAsync() {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return (IgniteFuture<Void>)createFuture(delegate.clearAsync());
     }
 
     /** {@inheritDoc} */
     @Override public void localClear(K key) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         delegate.clearLocally(key);
     }
 
     /** {@inheritDoc} */
     @Override public void localClearAll(Set<? extends K> keys) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         for (K key : keys)
             delegate.clearLocally(key);
     }
@@ -1427,6 +1673,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
     /** {@inheritDoc} */
     @Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args)
         throws EntryProcessorException {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync()) {
                 setFuture(invokeAsync0(key, entryProcessor, args));
@@ -1459,6 +1707,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
      * @return Internal future.
      */
     private <T> IgniteInternalFuture<T> invokeAsync0(K key, EntryProcessor<K, V, T> entryProcessor, Object[] args) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         IgniteInternalFuture<EntryProcessorResult<T>> fut = delegate.invokeAsync(key, entryProcessor, args);
 
         return fut.chain(new CX1<IgniteInternalFuture<EntryProcessorResult<T>>, T>() {
@@ -1497,7 +1747,10 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
     public <T> T invoke(@Nullable AffinityTopologyVersion topVer,
         K key,
         EntryProcessor<K, V, T> entryProcessor,
-        Object... args) {
+        Object... args
+    ) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync())
                 throw new UnsupportedOperationException();
@@ -1515,7 +1768,10 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
     /** {@inheritDoc} */
     @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
         EntryProcessor<K, V, T> entryProcessor,
-        Object... args) {
+        Object... args
+    ) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync()) {
                 setFuture(delegate.invokeAllAsync(keys, entryProcessor, args));
@@ -1533,13 +1789,19 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
     /** {@inheritDoc} */
     @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys,
         EntryProcessor<K, V, T> entryProcessor, Object... args) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return createFuture(delegate.invokeAllAsync(keys, entryProcessor, args));
     }
 
     /** {@inheritDoc} */
-    @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
+    @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(
+        Set<? extends K> keys,
         CacheEntryProcessor<K, V, T> entryProcessor,
-        Object... args) {
+        Object... args
+    ) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync()) {
                 setFuture(delegate.invokeAllAsync(keys, entryProcessor, args));
@@ -1557,6 +1819,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
     /** {@inheritDoc} */
     @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys,
         CacheEntryProcessor<K, V, T> entryProcessor, Object... args) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return createFuture(delegate.invokeAllAsync(keys, entryProcessor, args));
     }
 
@@ -1564,6 +1828,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
     @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(
         Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
         Object... args) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             if (isAsync()) {
                 setFuture(delegate.invokeAllAsync(map, args));
@@ -1581,12 +1847,14 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
     /** {@inheritDoc} */
     @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(
         Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return createFuture(delegate.invokeAllAsync(map, args));
     }
 
     /** {@inheritDoc} */
     @Override public String getName() {
-        return delegate.name();
+        return cacheName;
     }
 
     /** {@inheritDoc} */
@@ -1608,7 +1876,9 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<?> destroyAsync() {
-        return new IgniteFutureImpl<>(ctx.kernalContext().cache().dynamicDestroyCache(ctx.name(), false, true, false));
+        GridCacheContext<K, V> ctx = getContextSafe();
+
+        return new IgniteFutureImpl<>(ctx.kernalContext().cache().dynamicDestroyCache(cacheName, false, true, false, null));
     }
 
     /** {@inheritDoc} */
@@ -1618,11 +1888,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<?> closeAsync() {
-        return new IgniteFutureImpl<>(ctx.kernalContext().cache().dynamicCloseCache(ctx.name()));
+        GridCacheContext<K, V> ctx = getContextSafe();
+
+        return new IgniteFutureImpl<>(ctx.kernalContext().cache().dynamicCloseCache(cacheName));
     }
 
     /** {@inheritDoc} */
     @Override public boolean isClosed() {
+        GridCacheContext<K, V> ctx = getContextSafe();
+
         return ctx.kernalContext().cache().context().closed(ctx);
     }
 
@@ -1630,14 +1904,19 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
     @Override public <T> T unwrap(Class<T> clazz) {
         if (clazz.isAssignableFrom(getClass()))
             return (T)this;
-        else if (clazz.isAssignableFrom(IgniteEx.class))
+        else if (clazz.isAssignableFrom(IgniteEx.class)) {
+            GridCacheContext<K, V> ctx = getContextSafe();
+
             return (T)ctx.grid();
+        }
 
         throw new IllegalArgumentException("Unwrapping to class is not supported: " + clazz);
     }
 
     /** {@inheritDoc} */
     @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) {
+        GridCacheContext<K, V> ctx = getContextSafe();
+
         try {
             CacheOperationContext opCtx = ctx.operationContextPerCall();
 
@@ -1650,6 +1929,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) {
+        GridCacheContext<K, V> ctx = getContextSafe();
+
         try {
             ctx.continuousQueries().cancelJCacheQuery(lsnrCfg);
         }
@@ -1660,6 +1941,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public Iterator<Cache.Entry<K, V>> iterator() {
+        GridCacheContext<K, V> ctx = getContextSafe();
+
         try {
             return ctx.cache().igniteIterator();
         }
@@ -1670,6 +1953,9 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override protected IgniteCache<K, V> createAsyncInstance() {
+        GridCacheContext<K, V> ctx = getContextSafe();
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return new IgniteCacheProxyImpl<K, V>(
                 ctx,
                 delegate,
@@ -1741,10 +2027,25 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
     private RuntimeException cacheException(Exception e) {
         GridFutureAdapter<Void> restartFut = this.restartFut.get();
 
+        if (X.hasCause(e, IgniteCacheRestartingException.class)) {
+            IgniteCacheRestartingException restartingException = X.cause(e, IgniteCacheRestartingException.class);
+
+            if (restartingException.restartFuture() == null) {
+                if (restartFut == null)
+                    restartFut = suspend();
+
+                assert restartFut != null;
+
+                throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(restartFut), cacheName);
+            }
+            else
+                throw restartingException;
+        }
+
         if (restartFut != null) {
             if (X.hasCause(e, CacheStoppedException.class) || X.hasSuppressed(e, CacheStoppedException.class))
                 throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(restartFut), "Cache is restarting: " +
-                        ctx.name(), e);
+                        cacheName, e);
         }
 
         if (e instanceof IgniteException && X.hasCause(e, CacheException.class))
@@ -1778,7 +2079,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
      * @return Internal proxy.
      */
     @Override public GridCacheProxyImpl<K, V> internalProxy() {
-        checkRestart();
+        GridCacheContext<K, V> ctx = getContextSafe();
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
 
         return new GridCacheProxyImpl<>(ctx, delegate, ctx.operationContextPerCall());
     }
@@ -1799,11 +2101,15 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public Collection<Integer> lostPartitions() {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         return delegate.lostPartitions();
     }
 
     /** {@inheritDoc} */
     @Override public void enableStatistics(boolean enabled) {
+        GridCacheContext<K, V> ctx = getContextSafe();
+
         try {
             ctx.kernalContext().cache().enableStatistics(Collections.singleton(getName()), enabled);
         }
@@ -1814,6 +2120,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public void clearStatistics() {
+        GridCacheContext<K, V> ctx = getContextSafe();
+
         try {
             ctx.kernalContext().cache().clearStatistics(Collections.singleton(getName()));
         }
@@ -1824,6 +2132,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public void preloadPartition(int part) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             delegate.preloadPartition(part);
         }
@@ -1834,6 +2144,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<Void> preloadPartitionAsync(int part) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             return (IgniteFuture<Void>)createFuture(delegate.preloadPartitionAsync(part));
         }
@@ -1844,6 +2156,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override public boolean localPreloadPartition(int part) {
+        IgniteInternalCache<K, V> delegate = getDelegateSafe();
+
         try {
             return delegate.localPreloadPartition(part);
         }
@@ -1864,15 +2178,23 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
         ctx = (GridCacheContext<K, V>)in.readObject();
 
         delegate = (IgniteInternalCache<K, V>)in.readObject();
+
+        cacheName = ctx.name();
+
+        assert cacheName.equals(delegate.name()) : "ctx.name=" + cacheName + ", delegate.name=" + delegate.name();
     }
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<Boolean> rebalance() {
+        GridCacheContext<K, V> ctx = getContextSafe();
+
         return new IgniteFutureImpl<>(ctx.preloader().forceRebalance());
     }
 
     /** {@inheritDoc} */
     @Override public IgniteFuture<?> indexReadyFuture() {
+        GridCacheContext<K, V> ctx = getContextSafe();
+
         IgniteInternalFuture fut = ctx.shared().database().indexRebuildFuture(ctx.cacheId());
 
         if (fut == null)
@@ -1885,10 +2207,29 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
      * Throws {@code IgniteCacheRestartingException} if proxy is restarting.
      */
     public void checkRestart() {
+       checkRestart(false);
+    }
+
+    /**
+     * Throws {@code IgniteCacheRestartingException} if proxy is restarting.
+     */
+    public void checkRestart(boolean noWait) {
         RestartFuture currentFut = restartFut.get();
 
-        if (currentFut != null)
-            currentFut.checkRestartOrAwait();
+        if (currentFut != null) {
+            try {
+                if (!noWait) {
+                    currentFut.get(1, TimeUnit.SECONDS);
+
+                    return;
+                }
+            }
+            catch (IgniteCheckedException ignore) {
+                //do nothing
+            }
+
+            throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(currentFut), cacheName);
+        }
     }
 
     /**
@@ -1899,26 +2240,33 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
     }
 
     /**
-     * Restarts this cache proxy.
+     * Suspend this cache proxy.
+     * To make cache proxy active again, it's needed to restart it.
      */
-    public boolean restart() {
-        RestartFuture restartFut = new RestartFuture(ctx.name());
-
-         RestartFuture curFut = this.restartFut.get();
-
-        boolean changed = this.restartFut.compareAndSet(curFut, restartFut);
+    public RestartFuture suspend() {
+        while (true) {
+            RestartFuture curFut = this.restartFut.get();
+
+            if (curFut == null) {
+                RestartFuture restartFut = new RestartFuture(cacheName);
+
+                if (this.restartFut.compareAndSet(null, restartFut)) {
+                    synchronized (this) {
+                        if (!restartFut.isDone()) {
+                            if (oldContext == null) {
+                                oldContext = ctx;
+                                delegate = null;
+                                ctx = null;
+                            }
+                        }
+                    }
 
-        if (changed && curFut != null)
-            restartFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
-                @Override public void apply(IgniteInternalFuture<Void> fut) {
-                    if (fut.error() != null)
-                        curFut.onDone(fut.error());
-                    else
-                        curFut.onDone();
+                    return restartFut;
                 }
-            });
-
-        return changed;
+            }
+            else
+                return curFut;
+        }
     }
 
     /**
@@ -1934,19 +2282,30 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
     /**
      * If proxy is already being restarted, returns future to wait on, else restarts this cache proxy.
      *
-     * @return Future to wait on, or null.
+     * @param cache To use for restart proxy.
      */
-    public GridFutureAdapter<Void> opportunisticRestart() {
-        RestartFuture restartFut = new RestartFuture(ctx.name());
+    public void opportunisticRestart(IgniteInternalCache<K, V> cache) {
+        RestartFuture restartFut = new RestartFuture(cacheName);
 
         while (true) {
-            if (this.restartFut.compareAndSet(null, restartFut))
-                return null;
+            if (this.restartFut.compareAndSet(null, restartFut)) {
+                onRestarted(cache.context(), cache.context().cache());
+
+                return;
+            }
 
             GridFutureAdapter<Void> curFut = this.restartFut.get();
 
-            if (curFut != null)
-                return curFut;
+            if (curFut != null) {
+                try {
+                    curFut.get();
+                }
+                catch (IgniteCheckedException ignore) {
+                    // Do notrhing.
+                }
+
+                return;
+            }
         }
     }
 
@@ -1961,12 +2320,18 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
         assert restartFut != null;
 
-        this.ctx = ctx;
-        this.delegate = delegate;
+        synchronized (this) {
+            this.restartFut.compareAndSet(restartFut, null);
+
+            this.ctx = ctx;
+            oldContext = null;
+            this.delegate = delegate;
 
-        this.restartFut.compareAndSet(restartFut, null);
+            restartFut.onDone();
+        }
 
-        restartFut.onDone();
+        assert delegate == null || cacheName.equals(delegate.name()) && cacheName.equals(ctx.name()) :
+                "ctx.name=" + ctx.name() + ", delegate.name=" + delegate.name() + ", cacheName=" + cacheName;
     }
 
     /**
index ab4287e..447f16f 100644 (file)
@@ -31,6 +31,7 @@ import java.util.UUID;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.EntryProcessorResult;
+import org.apache.ignite.IgniteCacheRestartingException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -1761,7 +1762,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         try {
                             if (top.stopping()) {
-                                res.addFailedKeys(req.keys(), new CacheStoppedException(name()));
+                                if (ctx.shared().cache().isCacheRestarting(name()))
+                                    res.addFailedKeys(req.keys(), new IgniteCacheRestartingException(name()));
+                                else
+                                    res.addFailedKeys(req.keys(), new CacheStoppedException(name()));
 
                                 completionCb.apply(req, res);
 
index d4e81e0..264dbe7 100644 (file)
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.UUID;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.IgniteCacheRestartingException;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -403,8 +404,11 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         AffinityTopologyVersion topVer;
 
         if (cache.topology().stopping()) {
-            completeFuture(null,new CacheStoppedException(
-                cache.name()),
+            completeFuture(
+                null,
+                cctx.shared().cache().isCacheRestarting(cache.name())?
+                    new IgniteCacheRestartingException(cache.name()):
+                    new CacheStoppedException(cache.name()),
                 null);
 
             return;
index 072918e..3835d6a 100644 (file)
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.UUID;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.IgniteCacheRestartingException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cluster.ClusterNode;
@@ -623,7 +624,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         AffinityTopologyVersion topVer;
 
         if (cache.topology().stopping()) {
-            completeFuture(null,new CacheStoppedException(cache.name()), null);
+            completeFuture(
+                null,
+                cctx.shared().cache().isCacheRestarting(cache.name())?
+                    new IgniteCacheRestartingException(cache.name()):
+                    new CacheStoppedException(cache.name()),
+                null);
 
             return;
         }
index 2c81036..fec572e 100644 (file)
@@ -29,6 +29,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteCacheRestartingException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
@@ -830,7 +831,10 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
 
         try {
             if (cctx.topology().stopping()) {
-                onDone(new CacheStoppedException(cctx.name()));
+                onDone(
+                    cctx.shared().cache().isCacheRestarting(cctx.name())?
+                        new IgniteCacheRestartingException(cctx.name()):
+                        new CacheStoppedException(cctx.name()));
 
                 return;
             }
index 3691250..ba882c0 100644 (file)
@@ -30,6 +30,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteCacheRestartingException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
@@ -893,7 +894,10 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
 
         try {
             if (cctx.topology().stopping()) {
-                onDone(new CacheStoppedException(cctx.name()));
+                onDone(
+                    cctx.shared().cache().isCacheRestarting(cctx.name())?
+                        new IgniteCacheRestartingException(cctx.name()):
+                        new CacheStoppedException(cctx.name()));
 
                 return;
             }
index b52d440..27b2fd7 100644 (file)
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.apache.ignite.IgniteCacheRestartingException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
@@ -317,7 +318,10 @@ public abstract class GridNearTxAbstractEnlistFuture<T> extends GridCacheCompoun
 
         try {
             if (cctx.topology().stopping()) {
-                onDone(new CacheStoppedException(cctx.name()));
+                onDone(
+                    cctx.shared().cache().isCacheRestarting(cctx.name())?
+                        new IgniteCacheRestartingException(cctx.name()):
+                        new CacheStoppedException(cctx.name()));
 
                 return;
             }
index c13bf0e..7f8a121 100644 (file)
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.near;
 
+import org.apache.ignite.IgniteCacheRestartingException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -111,7 +112,10 @@ public class TxTopologyVersionFuture extends GridFutureAdapter<AffinityTopologyV
 
         try {
             if (cctx.topology().stopping()) {
-                onDone(new CacheStoppedException(cctx.name()));
+                onDone(
+                    cctx.shared().cache().isCacheRestarting(cctx.name())?
+                        new IgniteCacheRestartingException(cctx.name()):
+                        new CacheStoppedException(cctx.name()));
 
                 return;
             }
index 5a4eb8b..e4178f7 100755 (executable)
@@ -40,6 +40,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
@@ -69,6 +70,7 @@ import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolde
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.util.GridStripedReadWriteLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -159,6 +161,10 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
     /** */
     private final Set<Integer> grpsWithoutIdx = Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>());
 
+    /** */
+    private final GridStripedReadWriteLock initDirLock =
+        new GridStripedReadWriteLock(Math.max(Runtime.getRuntime().availableProcessors(), 8));
+
     /**
      * @param ctx Kernal context.
      */
@@ -709,69 +715,78 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
     private boolean checkAndInitCacheWorkDir(File cacheWorkDir) throws IgniteCheckedException {
         boolean dirExisted = false;
 
-        if (!Files.exists(cacheWorkDir.toPath())) {
-            try {
-                Files.createDirectory(cacheWorkDir.toPath());
-            }
-            catch (IOException e) {
-                throw new IgniteCheckedException("Failed to initialize cache working directory " +
-                    "(failed to create, make sure the work folder has correct permissions): " +
-                    cacheWorkDir.getAbsolutePath(), e);
+        ReadWriteLock lock = initDirLock.getLock(cacheWorkDir.getName().hashCode());
+
+        lock.writeLock().lock();
+
+        try {
+            if (!Files.exists(cacheWorkDir.toPath())) {
+                try {
+                    Files.createDirectory(cacheWorkDir.toPath());
+                }
+                catch (IOException e) {
+                    throw new IgniteCheckedException("Failed to initialize cache working directory " +
+                        "(failed to create, make sure the work folder has correct permissions): " +
+                        cacheWorkDir.getAbsolutePath(), e);
+                }
             }
-        }
-        else {
-            if (cacheWorkDir.isFile())
-                throw new IgniteCheckedException("Failed to initialize cache working directory " +
-                    "(a file with the same name already exists): " + cacheWorkDir.getAbsolutePath());
+            else {
+                if (cacheWorkDir.isFile())
+                    throw new IgniteCheckedException("Failed to initialize cache working directory " +
+                        "(a file with the same name already exists): " + cacheWorkDir.getAbsolutePath());
 
-            File lockF = new File(cacheWorkDir, IgniteCacheSnapshotManager.SNAPSHOT_RESTORE_STARTED_LOCK_FILENAME);
+                File lockF = new File(cacheWorkDir, IgniteCacheSnapshotManager.SNAPSHOT_RESTORE_STARTED_LOCK_FILENAME);
 
-            Path cacheWorkDirPath = cacheWorkDir.toPath();
+                Path cacheWorkDirPath = cacheWorkDir.toPath();
 
-            Path tmp = cacheWorkDirPath.getParent().resolve(cacheWorkDir.getName() + TMP_SUFFIX);
+                Path tmp = cacheWorkDirPath.getParent().resolve(cacheWorkDir.getName() + TMP_SUFFIX);
 
-            if (Files.exists(tmp) && Files.isDirectory(tmp) &&
+                if (Files.exists(tmp) && Files.isDirectory(tmp) &&
                     Files.exists(tmp.resolve(IgniteCacheSnapshotManager.TEMP_FILES_COMPLETENESS_MARKER))) {
 
-                U.warn(log, "Ignite node crashed during the snapshot restore process " +
-                    "(there is a snapshot restore lock file left for cache). But old version of cache was saved. " +
-                    "Trying to restore it. Cache - [" + cacheWorkDir.getAbsolutePath() + ']');
+                    U.warn(log, "Ignite node crashed during the snapshot restore process " +
+                        "(there is a snapshot restore lock file left for cache). But old version of cache was saved. " +
+                        "Trying to restore it. Cache - [" + cacheWorkDir.getAbsolutePath() + ']');
 
-                U.delete(cacheWorkDir);
+                    U.delete(cacheWorkDir);
 
-                try {
-                    Files.move(tmp, cacheWorkDirPath, StandardCopyOption.ATOMIC_MOVE);
+                    try {
+                        Files.move(tmp, cacheWorkDirPath, StandardCopyOption.ATOMIC_MOVE);
 
-                    cacheWorkDirPath.resolve(IgniteCacheSnapshotManager.TEMP_FILES_COMPLETENESS_MARKER).toFile().delete();
-                }
-                catch (IOException e) {
-                    throw new IgniteCheckedException(e);
+                        cacheWorkDirPath.resolve(IgniteCacheSnapshotManager.TEMP_FILES_COMPLETENESS_MARKER).toFile().delete();
+                    }
+                    catch (IOException e) {
+                        throw new IgniteCheckedException(e);
+                    }
                 }
-            }
-            else if (lockF.exists()) {
-                U.warn(log, "Ignite node crashed during the snapshot restore process " +
-                    "(there is a snapshot restore lock file left for cache). Will remove both the lock file and " +
-                    "incomplete cache directory [cacheDir=" + cacheWorkDir.getAbsolutePath() + ']');
+                else if (lockF.exists()) {
+                    U.warn(log, "Ignite node crashed during the snapshot restore process " +
+                        "(there is a snapshot restore lock file left for cache). Will remove both the lock file and " +
+                        "incomplete cache directory [cacheDir=" + cacheWorkDir.getAbsolutePath() + ']');
 
-                boolean deleted = U.delete(cacheWorkDir);
+                    boolean deleted = U.delete(cacheWorkDir);
 
-                if (!deleted)
-                    throw new IgniteCheckedException("Failed to remove obsolete cache working directory " +
-                        "(remove the directory manually and make sure the work folder has correct permissions): " +
-                        cacheWorkDir.getAbsolutePath());
+                    if (!deleted)
+                        throw new IgniteCheckedException("Failed to remove obsolete cache working directory " +
+                            "(remove the directory manually and make sure the work folder has correct permissions): " +
+                            cacheWorkDir.getAbsolutePath());
 
-                cacheWorkDir.mkdirs();
-            }
-            else
-                dirExisted = true;
+                    cacheWorkDir.mkdirs();
+                }
+                else
+                    dirExisted = true;
 
-            if (!cacheWorkDir.exists())
-                throw new IgniteCheckedException("Failed to initialize cache working directory " +
-                    "(failed to create, make sure the work folder has correct permissions): " +
-                    cacheWorkDir.getAbsolutePath());
+                if (!cacheWorkDir.exists())
+                    throw new IgniteCheckedException("Failed to initialize cache working directory " +
+                        "(failed to create, make sure the work folder has correct permissions): " +
+                        cacheWorkDir.getAbsolutePath());
 
-            if (Files.exists(tmp))
-                U.delete(tmp);
+                if (Files.exists(tmp))
+                    U.delete(tmp);
+            }
+        }
+        finally {
+            lock.writeLock().unlock();
         }
 
         return dirExisted;
index 9e15c8d..a3842a7 100644 (file)
@@ -37,7 +37,7 @@ public interface PageMemoryEx extends PageMemory {
      * @param grpId Group ID.
      * @param pageId Page ID.
      * @param page Page pointer.
-     * @param restore Determines if the page is locked for restore.
+     * @param restore Determines if the page is locked for restore memory (crash recovery).
      * @return ByteBuffer for modifying the page.
      */
     long writeLock(int grpId, long pageId, long page, boolean restore);
index 808b7ca..b1e1b02 100644 (file)
@@ -23,6 +23,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.ignite.IgniteCacheRestartingException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
@@ -148,7 +149,10 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
         cacheCtx.topology().readLock();
 
         if (cacheCtx.topology().stopping()) {
-            fut.onDone(new CacheStoppedException(cacheCtx.name()));
+            fut.onDone(
+                cctx.cache().isCacheRestarting(cacheCtx.name())?
+                    new IgniteCacheRestartingException(cacheCtx.name()):
+                    new CacheStoppedException(cacheCtx.name()));
 
             return null;
         }
index 8a00244..9cbea0f 100644 (file)
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import org.apache.ignite.IgniteCacheRestartingException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheInterceptor;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
@@ -293,7 +294,10 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
         nonLocCtx.topology().readLock();
 
         if (nonLocCtx.topology().stopping()) {
-            fut.onDone(new CacheStoppedException(nonLocCtx.name()));
+            fut.onDone(
+                cctx.cache().isCacheRestarting(nonLocCtx.name())?
+                    new IgniteCacheRestartingException(nonLocCtx.name()):
+                    new CacheStoppedException(nonLocCtx.name()));
 
             return null;
         }
index 2c336a0..1767234 100644 (file)
@@ -149,7 +149,7 @@ public abstract class AtomicDataStructureProxy<V extends AtomicDataStructureValu
      * @return Error.
      */
     private IllegalStateException suspendedError() {
-        throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(suspendFut), "Underlying cache is restarting: " + ctx.name());
+        throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(suspendFut), ctx.name());
     }
 
     /** {@inheritDoc} */
index ea78f6c..cc799fc 100644 (file)
@@ -704,6 +704,10 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
         });
     }
 
+    /**
+     * Would suspend calls for this cache if it is atomics cache.
+     * @param cacheName To suspend.
+     */
     public void suspend(String cacheName) {
         for (Map.Entry<GridCacheInternalKey, GridCacheRemovable> e : dsMap.entrySet()) {
             String cacheName0 = ATOMICS_CACHE_NAME + "@" + e.getKey().groupName();
@@ -713,12 +717,24 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
         }
     }
 
-    public void restart(IgniteInternalCache cache) {
+
+    /**
+     * Would return this cache to normal work if it was suspened (and if it is atomics cache).
+     * @param cacheName To restart.
+     */
+    public void restart(String cacheName, IgniteInternalCache cache) {
         for (Map.Entry<GridCacheInternalKey, GridCacheRemovable> e : dsMap.entrySet()) {
             String cacheName0 = ATOMICS_CACHE_NAME + "@" + e.getKey().groupName();
 
-            if (cacheName0.equals(cache.name()))
-                e.getValue().restart(cache);
+            if (cacheName0.equals(cacheName)) {
+                if (cache != null)
+                    e.getValue().restart(cache);
+                else {
+                    e.getValue().onRemoved();
+
+                    dsMap.remove(e.getKey(), e.getValue());
+                }
+            }
         }
     }
 
index d26a153..aa66ae0 100644 (file)
@@ -35,7 +35,14 @@ public interface GridCacheRemovable {
      */
     public void needCheckNotRemoved();
 
+    /**
+     * Would suspend calls for this object.
+     */
     public void suspend();
 
+    /**
+     * Would return this object work to normal.
+     * @param cache To update with.
+     */
     public void restart(IgniteInternalCache cache);
 }
\ No newline at end of file
index 729f6eb..541ca30 100644 (file)
@@ -365,7 +365,7 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
 
             if (delegate.separated()) {
                 IgniteInternalFuture<Boolean> fut = cctx.kernalContext().cache().dynamicDestroyCache(
-                    cctx.cache().name(), false, true, false);
+                    cctx.cache().name(), false, true, false, null);
 
                 ((GridFutureAdapter)fut).ignoreInterrupts();
 
index 6e83bd3..e0b6a89 100644 (file)
@@ -19,11 +19,13 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -47,8 +49,10 @@ import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -189,7 +193,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
 
         GridTestUtils.runMultiThreaded(new Callable<Object>() {
             @Override public Object call() throws Exception {
-                futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, false, true, false));
+                futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, false, true, false, null));
 
                 return null;
             }
@@ -258,7 +262,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
             @Override public Object call() throws Exception {
                 IgniteEx kernal = grid(ThreadLocalRandom.current().nextInt(nodeCount()));
 
-                futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, false, true, false));
+                futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, false, true, false, null));
 
                 return null;
             }
@@ -1366,4 +1370,62 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
 
         fut.get();
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCacheRestartIsAllowedOnlyToItsInititator() throws Exception {
+        IgniteEx kernal = grid(ThreadLocalRandom.current().nextInt(nodeCount()));
+
+        CacheConfiguration ccfg = new CacheConfiguration("testCacheRestartIsAllowedOnlyToItsInititator");
+
+        kernal.createCache(ccfg);
+
+        IgniteUuid restartId = IgniteUuid.randomUuid();
+
+        kernal.context().cache().dynamicDestroyCache(ccfg.getName(), false, true, true, restartId)
+                .get(getTestTimeout(), TimeUnit.MILLISECONDS);
+
+        try {
+            kernal.createCache(ccfg);
+
+            fail();
+        }
+        catch (Exception e) {
+            assertTrue(X.hasCause(e, CacheExistsException.class));
+
+            System.out.println("User couldn't start new cache with the same name");
+        }
+
+        try {
+            kernal.context().cache().dynamicStartCache(ccfg, ccfg.getName(), null, true, false, true).get();
+
+            fail();
+        }
+        catch (Exception e) {
+            assertTrue(X.hasCause(e, CacheExistsException.class));
+
+            System.out.println("We couldn't start new cache with private API");
+        }
+
+        StoredCacheData storedCacheData = new StoredCacheData(ccfg);
+
+        try {
+            kernal.context().cache().dynamicStartCachesByStoredConf(Collections.singleton(storedCacheData), true, true, false, IgniteUuid.randomUuid()).get();
+
+            fail();
+        }
+        catch (Exception e) {
+            assertTrue(X.hasCause(e, CacheExistsException.class));
+
+            System.out.println("We couldn't start new cache with wrong restart id.");
+        }
+
+        kernal.context().cache().dynamicStartCachesByStoredConf(Collections.singleton(storedCacheData), true, true, false, restartId).get();
+
+        System.out.println("We successfully restarted cache with initial restartId.");
+
+        kernal.destroyCache(ccfg.getName());
+    }
 }
index f58e110..b4dae39 100644 (file)
@@ -134,7 +134,7 @@ public class IgnitePdsCacheStartStopWithFreqCheckpointTest extends GridCommonAbs
 
                 try {
                     // Stop cache without destroy.
-                    crd.context().cache().dynamicDestroyCaches(cacheNames, false, false, false).get();
+                    crd.context().cache().dynamicDestroyCaches(cacheNames, false,false).get();
                 }
                 catch (IgniteCheckedException e) {
                     throw new IgniteException("Failed to destroy cache", e);
index a1f47b6..e0ebc90 100644 (file)
 
 package org.apache.ignite.internal.processors.query.h2;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.Cache;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
@@ -43,20 +56,6 @@ import org.locationtech.jts.geom.Geometry;
 import org.locationtech.jts.io.ParseException;
 import org.locationtech.jts.io.WKTReader;
 
-import javax.cache.Cache;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
 /**
  * Geo-indexing test.
  */
@@ -231,7 +230,7 @@ public abstract class H2IndexingAbstractGeoSelfTest extends GridCacheAbstractSel
         if (!dynamic)
             cache.destroy();
         else
-            grid.context().cache().dynamicDestroyCache(cache.getName(), true, true, false);
+            grid.context().cache().dynamicDestroyCache(cache.getName(), true, true, false, null);
     }
 
     /**
index 35ffd57..e7ab35d 100644 (file)
@@ -558,7 +558,7 @@ public abstract class AbstractSchemaSelfTest extends GridCommonAbstractTest {
      * @param node Node to create cache on.
      */
     protected void destroySqlCache(Ignite node) throws IgniteCheckedException {
-        ((IgniteEx)node).context().cache().dynamicDestroyCache(CACHE_NAME, true, true, false).get();
+        ((IgniteEx)node).context().cache().dynamicDestroyCache(CACHE_NAME, true, true, false, null).get();
     }
 
     /**
index 176744b..7bc95c7 100644 (file)
@@ -81,7 +81,7 @@ public abstract class DynamicIndexAbstractBasicSelfTest extends DynamicIndexAbst
 
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
-        node().context().cache().dynamicDestroyCache(CACHE_NAME, true, true, false).get();
+        node().context().cache().dynamicDestroyCache(CACHE_NAME, true, true, false, null).get();
 
         super.afterTest();
     }