IGNITE-7569 Fixed index rebuild future - Fixes #3454.
authorAlexey Goncharuk <alexey.goncharuk@gmail.com>
Wed, 31 Jan 2018 08:22:26 +0000 (11:22 +0300)
committerAlexey Goncharuk <alexey.goncharuk@gmail.com>
Wed, 31 Jan 2018 08:22:26 +0000 (11:22 +0300)
Signed-off-by: Alexey Goncharuk <alexey.goncharuk@gmail.com>
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java

index 6c09b6a..a45c9b9 100644 (file)
@@ -168,7 +168,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /** */
     private AtomicBoolean added = new AtomicBoolean(false);
 
-    /** Event latch. */
+    /**
+     * Discovery event receive latch. There is a race between discovery event processing and single message
+     * processing, so it is possible to create an exchange future before the actual discovery event is received.
+     * This latch is notified when the discovery event arrives.
+     */
     @GridToStringExclude
     private final CountDownLatch evtLatch = new CountDownLatch(1);
 
@@ -344,6 +348,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
+     * Sets exchange actions associated with the exchange future (such as cache start or stop).
+     * Exchange actions is created from discovery event, so the actions must be set before the event is processed,
+     * thus the setter requires that {@code evtLatch} be armed.
+     *
      * @param exchActions Exchange actions.
      */
     public void exchangeActions(ExchangeActions exchActions) {
@@ -354,6 +362,20 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
+     * Gets exchanges actions (such as cache start or stop) associated with the exchange future.
+     * Exchange actions can be {@code null} (for example, if the exchange is created for topology
+     * change event).
+     *
+     * @return Exchange actions.
+     */
+    @Nullable public ExchangeActions exchangeActions() {
+        return exchActions;
+    }
+
+    /**
+     * Sets affinity change message associated with the exchange. Affinity change message is required when
+     * centralized affinity change is performed.
+     *
      * @param affChangeMsg Affinity change message.
      */
     public void affinityChangeMessage(CacheAffinityChangeMessage affChangeMsg) {
@@ -361,9 +383,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
+     * Gets the affinity topology version for which this exchange was created. If several exchanges
+     * were merged, initial version is the version of the earliest merged exchange.
+     *
      * @return Initial exchange version.
      */
-    public AffinityTopologyVersion initialVersion() {
+    @Override public AffinityTopologyVersion initialVersion() {
         return exchId.topologyVersion();
     }
 
index f833911..0b35f18 100755 (executable)
@@ -105,6 +105,7 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRec
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
+import org.apache.ignite.internal.processors.cache.ExchangeActions;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.StoredCacheData;
@@ -320,8 +321,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     /** Thread local with buffers for the checkpoint threads. Each buffer represent one page for durable memory. */
     private ThreadLocal<ByteBuffer> threadBuf;
 
-    /** */
-    private final ConcurrentMap<Integer, IgniteInternalFuture> idxRebuildFuts = new ConcurrentHashMap<>();
+    /** Map from a cacheId to a future indicating that there is an in-progress index rebuild for the given cache. */
+    private final ConcurrentMap<Integer, GridFutureAdapter<Void>> idxRebuildFuts = new ConcurrentHashMap<>();
 
     /**
      * Lock holder for compatible folders mode. Null if lock holder was created at start node. <br>
@@ -1127,33 +1128,59 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         // Before local node join event.
         if (clusterInTransitionStateToActive || (joinEvt && locNode && isSrvNode))
             restoreState();
+
+        if (cctx.kernalContext().query().moduleEnabled()) {
+            ExchangeActions acts = fut.exchangeActions();
+
+            if (acts != null && !F.isEmpty(acts.cacheStartRequests())) {
+                for (ExchangeActions.CacheActionData actionData : acts.cacheStartRequests()) {
+                    int cacheId = CU.cacheId(actionData.request().cacheName());
+
+                    GridFutureAdapter<Void> old = idxRebuildFuts.put(cacheId, new GridFutureAdapter<>());
+
+                    if (old != null)
+                        old.onDone();
+                }
+            }
+        }
     }
 
     /** {@inheritDoc} */
     @Override public void rebuildIndexesIfNeeded(GridDhtPartitionsExchangeFuture fut) {
         if (cctx.kernalContext().query().moduleEnabled()) {
             for (final GridCacheContext cacheCtx : (Collection<GridCacheContext>)cctx.cacheContexts()) {
-                if (cacheCtx.startTopologyVersion().equals(fut.initialVersion()) &&
-                    !cctx.pageStore().hasIndexStore(cacheCtx.groupId()) && cacheCtx.affinityNode()) {
+                if (cacheCtx.startTopologyVersion().equals(fut.initialVersion())) {
                     final int cacheId = cacheCtx.cacheId();
+                    final GridFutureAdapter<Void> usrFut = idxRebuildFuts.get(cacheId);
+
+                    if (!cctx.pageStore().hasIndexStore(cacheCtx.groupId()) && cacheCtx.affinityNode()) {
+                        IgniteInternalFuture<?> rebuildFut = cctx.kernalContext().query()
+                            .rebuildIndexesFromHash(Collections.singletonList(cacheCtx.cacheId()));
 
-                    final IgniteInternalFuture<?> rebuildFut = cctx.kernalContext().query()
-                        .rebuildIndexesFromHash(Collections.singletonList(cacheCtx.cacheId()));
+                        assert usrFut != null : "Missing user future for cache: " + cacheCtx.name();
 
-                    idxRebuildFuts.put(cacheId, rebuildFut);
+                        rebuildFut.listen(new CI1<IgniteInternalFuture>() {
+                            @Override public void apply(IgniteInternalFuture igniteInternalFut) {
+                                idxRebuildFuts.remove(cacheId, usrFut);
 
-                    rebuildFut.listen(new CI1<IgniteInternalFuture>() {
-                        @Override public void apply(IgniteInternalFuture igniteInternalFut) {
-                            idxRebuildFuts.remove(cacheId, rebuildFut);
+                                usrFut.onDone(igniteInternalFut.error());
 
-                            CacheConfiguration ccfg = cacheCtx.config();
+                                CacheConfiguration ccfg = cacheCtx.config();
 
-                            if (ccfg != null) {
-                                log().info("Finished indexes rebuilding for cache [name=" + ccfg.getName()
-                                    + ", grpName=" + ccfg.getGroupName() + ']');
+                                if (ccfg != null) {
+                                    log().info("Finished indexes rebuilding for cache [name=" + ccfg.getName()
+                                        + ", grpName=" + ccfg.getGroupName() + ']');
+                                }
                             }
+                        });
+                    }
+                    else {
+                        if (usrFut != null) {
+                            idxRebuildFuts.remove(cacheId, usrFut);
+
+                            usrFut.onDone();
                         }
-                    });
+                    }
                 }
             }
         }