IGNITE-7676 Add affinity version to snapshot plugin stub - Fixes #3510.
authorEdShangGG <eshangareev@gridgain.com>
Fri, 16 Feb 2018 13:29:49 +0000 (16:29 +0300)
committerAlexey Goncharuk <alexey.goncharuk@gmail.com>
Fri, 16 Feb 2018 13:29:49 +0000 (16:29 +0300)
Signed-off-by: Alexey Goncharuk <alexey.goncharuk@gmail.com>
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java

index 73e6417..7a1265e 100644 (file)
@@ -1136,7 +1136,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         try {
             long start = U.currentTimeMillis();
 
-            IgniteInternalFuture fut = cctx.snapshot().tryStartLocalSnapshotOperation(firstDiscoEvt);
+            IgniteInternalFuture fut = cctx.snapshot().tryStartLocalSnapshotOperation(firstDiscoEvt, exchId.topologyVersion());
 
             if (fut != null) {
                 fut.get();
index e53bd44..fde5fb4 100755 (executable)
@@ -2092,54 +2092,56 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
      * @param partStates Partition to restore state.
      */
     public void applyUpdatesOnRecovery(
-        WALIterator it,
+        @Nullable WALIterator it,
         IgnitePredicate<IgniteBiTuple<WALPointer, WALRecord>> recPredicate,
         IgnitePredicate<DataEntry> entryPredicate,
         Map<T2<Integer, Integer>, T2<Integer, Long>> partStates
     ) throws IgniteCheckedException {
-        while (it.hasNextX()) {
-            IgniteBiTuple<WALPointer, WALRecord> next = it.nextX();
+        if (it != null) {
+            while (it.hasNextX()) {
+                IgniteBiTuple<WALPointer, WALRecord> next = it.nextX();
 
-            WALRecord rec = next.get2();
+                WALRecord rec = next.get2();
 
-            if (!recPredicate.apply(next))
-                break;
+                if (!recPredicate.apply(next))
+                    break;
 
-            switch (rec.type()) {
-                case DATA_RECORD:
-                    checkpointReadLock();
+                switch (rec.type()) {
+                    case DATA_RECORD:
+                        checkpointReadLock();
 
-                    try {
-                        DataRecord dataRec = (DataRecord) rec;
+                        try {
+                            DataRecord dataRec = (DataRecord)rec;
 
-                        for (DataEntry dataEntry : dataRec.writeEntries()) {
-                            if (entryPredicate.apply(dataEntry)) {
-                                checkpointReadLock();
+                            for (DataEntry dataEntry : dataRec.writeEntries()) {
+                                if (entryPredicate.apply(dataEntry)) {
+                                    checkpointReadLock();
 
-                                try {
-                                    int cacheId = dataEntry.cacheId();
+                                    try {
+                                        int cacheId = dataEntry.cacheId();
 
-                                    GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+                                        GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
 
-                                    if (cacheCtx != null)
-                                        applyUpdate(cacheCtx, dataEntry);
-                                    else if (log != null)
-                                        log.warning("Cache (cacheId=" + cacheId + ") is not started, can't apply updates.");
-                                }
-                                finally {
-                                    checkpointReadUnlock();
+                                        if (cacheCtx != null)
+                                            applyUpdate(cacheCtx, dataEntry);
+                                        else if (log != null)
+                                            log.warning("Cache (cacheId=" + cacheId + ") is not started, can't apply updates.");
+                                    }
+                                    finally {
+                                        checkpointReadUnlock();
+                                    }
                                 }
                             }
                         }
-                    }
-                    finally {
-                        checkpointReadUnlock();
-                    }
+                        finally {
+                            checkpointReadUnlock();
+                        }
 
-                    break;
+                        break;
 
-                default:
-                    // Skip other records.
+                    default:
+                        // Skip other records.
+                }
             }
         }
 
index c23adda..16cc8f5 100644 (file)
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.pagemem.FullPageId;
 import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
@@ -48,9 +49,12 @@ public class IgniteCacheSnapshotManager<T extends SnapshotOperation> extends Gri
      * Try to start local snapshot operation if it's required by discovery event.
      *
      * @param discoveryEvent Discovery event.
+     * @param topVer topology version on the moment when this method was called
+     *
+     * @throws IgniteCheckedException if failed
      */
     @Nullable public IgniteInternalFuture tryStartLocalSnapshotOperation(
-            @Nullable DiscoveryEvent discoveryEvent
+            @Nullable DiscoveryEvent discoveryEvent, AffinityTopologyVersion topVer
     ) throws IgniteCheckedException {
         return null;
     }
@@ -61,7 +65,8 @@ public class IgniteCacheSnapshotManager<T extends SnapshotOperation> extends Gri
      */
     @Nullable public IgniteInternalFuture startLocalSnapshotOperation(
         UUID initiatorNodeId,
-        T snapshotOperation
+        T snapshotOperation,
+        AffinityTopologyVersion topVer
     ) throws IgniteCheckedException {
         return null;
     }