IGNITE-10507 Control.sh add ability to check crc sums of stored pages - Fixes #5803.
authorSergey Antonov <antonovsergey93@gmail.com>
Fri, 18 Jan 2019 10:54:36 +0000 (13:54 +0300)
committerIvan Rakov <irakov@apache.org>
Fri, 18 Jan 2019 10:54:36 +0000 (13:54 +0300)
Signed-off-by: Ivan Rakov <irakov@apache.org>
23 files changed:
modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
modules/core/src/main/java/org/apache/ignite/internal/commandline/cache/CacheArguments.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLog.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DbCheckpointListener.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/GridNotIdleException.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyException.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyResultV2.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsDumpTask.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/VerifyBackupPartitionsTaskV2.java
modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyDumpTaskArg.java
modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyJob.java
modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyTaskArg.java
modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorIdleVerifyTaskV2.java
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/memtracker/PageMemoryTracker.java
modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/IgnitePersistentStoreSchemaLoadTest.java

index bf596e7..b84e2b9 100644 (file)
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.commandline;
 import java.io.Console;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -198,7 +199,7 @@ public class CommandHandler {
     private static final String CMD_SKIP_ZEROS = "--skip-zeros";
 
     /** Command exclude caches. */
-    private static final String CMD_EXCLUDE_CACHES = "--excludeCaches";
+    private static final String CMD_EXCLUDE_CACHES = "--exclude-caches";
 
     /** Cache filter. */
     private static final String CACHE_FILTER = "--cache-filter";
@@ -368,6 +369,9 @@ public class CommandHandler {
     /** */
     private static final String CONFIG = "--config";
 
+    /** */
+    private static final String IDLE_CHECK_CRC = "--check-crc";
+
     /** Utility name. */
     private static final String UTILITY_NAME = "control.sh";
 
@@ -1041,7 +1045,10 @@ public class CommandHandler {
      */
     private void legacyCacheIdleVerify(GridClient client, CacheArguments cacheArgs) throws GridClientException {
         VisorIdleVerifyTaskResult res = executeTask(
-            client, VisorIdleVerifyTask.class, new VisorIdleVerifyTaskArg(cacheArgs.caches(), cacheArgs.excludeCaches()));
+            client,
+            VisorIdleVerifyTask.class,
+            new VisorIdleVerifyTaskArg(cacheArgs.caches(), cacheArgs.excludeCaches(), cacheArgs.idleCheckCrc())
+        );
 
         Map<PartitionKey, List<PartitionHashRecord>> conflicts = res.getConflicts();
 
@@ -1174,7 +1181,6 @@ public class CommandHandler {
      * @param cacheArgs Cache args.
      */
     private void cacheResetLostPartitions(GridClient client, CacheArguments cacheArgs) throws GridClientException {
-
         CacheResetLostPartitionsTaskArg taskArg = new CacheResetLostPartitionsTaskArg(cacheArgs.caches());
 
         CacheResetLostPartitionsTaskResult res = executeTaskByNameOnNode(client, CacheResetLostPartitionsTask.class.getName(), taskArg, null);
@@ -1187,13 +1193,16 @@ public class CommandHandler {
      * @param cacheArgs Cache args.
      */
     private void cacheIdleVerifyDump(GridClient client, CacheArguments cacheArgs) throws GridClientException {
-        String path = executeTask(
-            client,
-            VisorIdleVerifyDumpTask.class,
-            new VisorIdleVerifyDumpTaskArg(cacheArgs.caches(), cacheArgs.excludeCaches(), cacheArgs.isSkipZeros(), cacheArgs
-                .getCacheFilterEnum())
+        VisorIdleVerifyDumpTaskArg arg = new VisorIdleVerifyDumpTaskArg(
+            cacheArgs.caches(),
+            cacheArgs.excludeCaches(),
+            cacheArgs.isSkipZeros(),
+            cacheArgs.getCacheFilterEnum(),
+            cacheArgs.idleCheckCrc()
         );
 
+        String path = executeTask(client, VisorIdleVerifyDumpTask.class, arg);
+
         log("VisorIdleVerifyDumpTask successfully written output to '" + path + "'");
     }
 
@@ -1203,7 +1212,10 @@ public class CommandHandler {
      */
     private void cacheIdleVerifyV2(GridClient client, CacheArguments cacheArgs) throws GridClientException {
         IdleVerifyResultV2 res = executeTask(
-            client, VisorIdleVerifyTaskV2.class, new VisorIdleVerifyTaskArg(cacheArgs.caches(), cacheArgs.excludeCaches()));
+            client,
+            VisorIdleVerifyTaskV2.class,
+            new VisorIdleVerifyTaskArg(cacheArgs.caches(),cacheArgs.excludeCaches(), cacheArgs.idleCheckCrc())
+        );
 
         res.print(System.out::print);
     }
@@ -2145,6 +2157,8 @@ public class CommandHandler {
                     }
                     else if (CMD_SKIP_ZEROS.equals(nextArg))
                         cacheArgs.skipZeros(true);
+                    else if (IDLE_CHECK_CRC.equals(nextArg))
+                        cacheArgs.idleCheckCrc(true);
                     else if (CACHE_FILTER.equals(nextArg)) {
                         if (cacheArgs.caches() != null || cacheArgs.excludeCaches() != null)
                             throw new IllegalArgumentException(ONE_CACHE_FILTER_OPT_SHOULD_USED_MSG);
@@ -2756,6 +2770,7 @@ public class CommandHandler {
         log("Control utility [ver. " + ACK_VER_STR + "]");
         log(COPYRIGHT);
         log("User: " + System.getProperty("user.name"));
+        log("Time: " + LocalDateTime.now());
         log(DELIM);
 
         try {
index fb5bc88..1407bef 100644 (file)
@@ -78,6 +78,9 @@ public class CacheArguments {
     /** Cache filter. */
     private CacheFilterEnum cacheFilterEnum = CacheFilterEnum.ALL;
 
+    /** Check CRC sum on idle verify. */
+    private boolean idleCheckCrc;
+
     /**
      * @return Gets filter of caches, which will by checked.
      */
@@ -307,4 +310,18 @@ public class CacheArguments {
      * @param outputFormat New output format.
      */
     public void outputFormat(OutputFormat outputFormat) { this.outputFormat = outputFormat; }
+
+    /**
+     * @return Check page CRC sum on idle verify flag.
+     */
+    public boolean idleCheckCrc() {
+        return idleCheckCrc;
+    }
+
+    /**
+     * @param idleCheckCrc Check page CRC sum on idle verify flag.
+     */
+    public void idleCheckCrc(boolean idleCheckCrc) {
+        this.idleCheckCrc = idleCheckCrc;
+    }
 }
index ca3053c..6b4d4fb 100644 (file)
@@ -188,7 +188,7 @@ public class TxLog implements DbCheckpointListener {
     }
 
     /** {@inheritDoc} */
-    @Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException {
+    @Override public void onMarkCheckpointBegin(Context ctx) throws IgniteCheckedException {
         Executor executor = ctx.executor();
 
         if (executor == null)
@@ -205,6 +205,11 @@ public class TxLog implements DbCheckpointListener {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException {
+        /* No-op. */
+    }
+
     /**
      *
      * @param major Major version.
index d64bbfe..bf2f13a 100644 (file)
@@ -49,10 +49,20 @@ public interface DbCheckpointListener {
          * @return Context executor.
          */
         @Nullable public Executor executor();
+
+        /**
+         * @return {@code True} if at least one page is dirty.
+         */
+        public boolean hasPages();
     }
 
     /**
      * @throws IgniteCheckedException If failed.
      */
+    public void onMarkCheckpointBegin(Context ctx) throws IgniteCheckedException;
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
     public void onCheckpointBegin(Context ctx) throws IgniteCheckedException;
 }
index 1ff982d..ce36bab 100755 (executable)
@@ -3085,6 +3085,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             tmpWriteBuf.order(ByteOrder.nativeOrder());
         }
 
+        /**
+         * @return Progress of current chekpoint or {@code null}, if isn't checkpoint at this moment.
+         */
+        public @Nullable CheckpointProgress currentProgress(){
+            return curCpProgress;
+        }
+
         /** {@inheritDoc} */
         @Override protected void body() {
             Throwable err = null;
@@ -3604,6 +3611,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
             checkpointLock.writeLock().lock();
 
+            DbCheckpointListener.Context ctx0 = null;
+
             try {
                 tracker.onMarkStart();
 
@@ -3625,41 +3634,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
                 GridCompoundFuture asyncLsnrFut = asyncRunner == null ? null : new GridCompoundFuture();
 
-                DbCheckpointListener.Context ctx0 = new DbCheckpointListener.Context() {
-                    @Override public boolean nextSnapshot() {
-                        return curr.nextSnapshot;
-                    }
-
-                    /** {@inheritDoc} */
-                    @Override public PartitionAllocationMap partitionStatMap() {
-                        return map;
-                    }
-
-                    /** {@inheritDoc} */
-                    @Override public boolean needToSnapshot(String cacheOrGrpName) {
-                        return curr.snapshotOperation.cacheGroupIds().contains(CU.cacheId(cacheOrGrpName));
-                    }
-
-                    /** {@inheritDoc} */
-                    @Override public Executor executor() {
-                        return asyncRunner == null ? null : cmd -> {
-                            try {
-                                GridFutureAdapter<?> res = new GridFutureAdapter<>();
-
-                                asyncRunner.execute(U.wrapIgniteFuture(cmd, res));
-
-                                asyncLsnrFut.add(res);
-                            }
-                            catch (RejectedExecutionException e) {
-                                assert false : "A task should never be rejected by async runner";
-                            }
-                        };
-                    }
-                };
+                ctx0 = createOnCheckpointMarkBeginContext(curr, map, asyncLsnrFut);
 
                 // Listeners must be invoked before we write checkpoint record to WAL.
                 for (DbCheckpointListener lsnr : lsnrs)
-                    lsnr.onCheckpointBegin(ctx0);
+                    lsnr.onMarkCheckpointBegin(ctx0);
 
                 if (asyncLsnrFut != null) {
                     asyncLsnrFut.markInitialized();
@@ -3751,8 +3730,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 tracker.onLockRelease();
             }
 
+            DbCheckpointListener.Context ctx = createOnCheckpointBeginContext(ctx0, hasPages);
+
             curr.cpBeginFut.onDone();
 
+            for (DbCheckpointListener lsnr : lsnrs)
+                lsnr.onCheckpointBegin(ctx);
+
             if (snapFut != null) {
                 try {
                     snapFut.get();
@@ -3812,6 +3796,86 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             }
         }
 
+        /** */
+        private DbCheckpointListener.Context createOnCheckpointBeginContext(
+            DbCheckpointListener.Context delegate,
+            boolean hasPages
+        ) {
+            return new DbCheckpointListener.Context() {
+                /** {@inheritDoc} */
+                @Override public boolean nextSnapshot() {
+                    return delegate.nextSnapshot();
+                }
+
+                /** {@inheritDoc} */
+                @Override public PartitionAllocationMap partitionStatMap() {
+                    return delegate.partitionStatMap();
+                }
+
+                /** {@inheritDoc} */
+                @Override public boolean needToSnapshot(String cacheOrGrpName) {
+                    return delegate.needToSnapshot(cacheOrGrpName);
+                }
+
+                /** {@inheritDoc} */
+                @Override public @Nullable Executor executor() {
+                    return delegate.executor();
+                }
+
+                /** {@inheritDoc} */
+                @Override public boolean hasPages() {
+                    return hasPages;
+                }
+            };
+        }
+
+        /** */
+        private DbCheckpointListener.Context createOnCheckpointMarkBeginContext(
+            CheckpointProgress currCpProgress,
+            PartitionAllocationMap map,
+            GridCompoundFuture asyncLsnrFut
+        ) {
+           return new DbCheckpointListener.Context() {
+               /** {@inheritDoc} */
+               @Override public boolean nextSnapshot() {
+                    return currCpProgress.nextSnapshot;
+                }
+
+               /** {@inheritDoc} */
+               @Override public PartitionAllocationMap partitionStatMap() {
+                    return map;
+                }
+
+               /** {@inheritDoc} */
+               @Override public boolean needToSnapshot(String cacheOrGrpName) {
+                    return currCpProgress.snapshotOperation.cacheGroupIds().contains(CU.cacheId(cacheOrGrpName));
+               }
+
+               /** {@inheritDoc} */
+               @Override public Executor executor() {
+                    return asyncRunner == null ? null : cmd -> {
+                        try {
+                            GridFutureAdapter<?> res = new GridFutureAdapter<>();
+
+                            asyncRunner.execute(U.wrapIgniteFuture(cmd, res));
+
+                            asyncLsnrFut.add(res);
+                        }
+                        catch (RejectedExecutionException e) {
+                            throw new IgniteException("A task should never be rejected by async runner", e);
+                        }
+                    };
+               }
+
+               /** {@inheritDoc} */
+               @Override public boolean hasPages() {
+                    throw new IllegalStateException(
+                        "Property is unknown at this moment. You should use onCheckpointBegin() method."
+                    );
+               }
+           };
+        }
+
         /**
          * Check that at least one collection is not empty.
          *
@@ -4257,7 +4321,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     /**
      * Data class representing the state of running/scheduled checkpoint.
      */
-    private static class CheckpointProgress {
+    public static class CheckpointProgress {
         /** Scheduled time of checkpoint. */
         private volatile long nextCpTs;
 
@@ -4295,6 +4359,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         private CheckpointProgress(long nextCpTs) {
             this.nextCpTs = nextCpTs;
         }
+
+        /** */
+        public boolean started() {
+            return cpBeginFut.isDone();
+        }
+
+        /** */
+        public boolean finished() {
+            return cpFinishFut.isDone();
+        }
     }
 
     /**
index f78428d..9d11e76 100644 (file)
@@ -173,6 +173,11 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
 
     /** {@inheritDoc} */
     @Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException {
+        /* No-op. */
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMarkCheckpointBegin(Context ctx) throws IgniteCheckedException {
         assert grp.dataRegion().pageMemory() instanceof PageMemoryEx;
 
         Executor execSvc = ctx.executor();
index a8fae08..8e0196e 100755 (executable)
@@ -714,6 +714,13 @@ public class FilePageStore implements PageStore {
     }
 
     /**
+     * @return File absolute path.
+     */
+    public String getFileAbsolutePath() {
+        return cfgFile.getAbsolutePath();
+    }
+
+    /**
      *
      */
     private long allocPage() {
index 0906605..dff2a03 100644 (file)
@@ -571,7 +571,7 @@ public class MetaStorage implements DbCheckpointListener, ReadWriteMetastorage {
     }
 
     /** {@inheritDoc} */
-    @Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException {
+    @Override public void onMarkCheckpointBegin(Context ctx) throws IgniteCheckedException {
         Executor executor = ctx.executor();
 
         if (executor == null) {
@@ -600,6 +600,11 @@ public class MetaStorage implements DbCheckpointListener, ReadWriteMetastorage {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException {
+        /* No-op. */
+    }
+
     /**
      * @throws IgniteCheckedException If failed.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/GridNotIdleException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/GridNotIdleException.java
new file mode 100644 (file)
index 0000000..8ea38f6
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.verify;
+
+import org.apache.ignite.IgniteException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * This exception defines not idle cluster state, when idle state expected.
+ */
+public class GridNotIdleException extends IgniteException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Create empty exception.
+     */
+    public GridNotIdleException() {
+        // No-op.
+    }
+
+    /**
+     * Creates new exception with given error message.
+     *
+     * @param msg Error message.
+     */
+    public GridNotIdleException(String msg) {
+        super(msg);
+    }
+
+    /**
+     * Creates new exception with given throwable as a cause and source of error message.
+     *
+     * @param cause Non-null throwable cause.
+     */
+    public GridNotIdleException(Throwable cause) {
+        super(cause);
+    }
+
+    /**
+     * Creates new exception with given error message and optional nested exception.
+     *
+     * @param msg Error message.
+     * @param cause Optional nested exception (can be {@code null}).
+     */
+    public GridNotIdleException(String msg, @Nullable Throwable cause) {
+        super(msg, cause);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return getClass() + ": " + getMessage();
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyException.java
new file mode 100644 (file)
index 0000000..a6d8a3f
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.verify;
+
+import java.util.Collection;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * This exception is used to collect exceptions occured in {@link VerifyBackupPartitionsTaskV2} execution.
+ */
+public class IdleVerifyException extends IgniteException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Occured exceptions. */
+    private final Collection<IgniteException> exceptions;
+
+    /** */
+    public IdleVerifyException(Collection<IgniteException> exceptions) {
+        if(F.isEmpty(exceptions))
+            throw new IllegalArgumentException("Exceptions can't be empty!");
+
+        this.exceptions = exceptions;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getMessage() {
+        return exceptions.stream()
+            .map(Throwable::getMessage)
+            .collect(Collectors.joining(", "));
+    }
+
+    /**
+     * @return Exceptions.
+     */
+    public Collection<IgniteException> exceptions() {
+        return exceptions;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return getClass() + ": " + getMessage();
+    }
+}
index a153063..3ccfe00 100644 (file)
  */
 package org.apache.ignite.internal.processors.cache.verify;
 
+import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import java.io.PrintWriter;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 import java.util.function.Consumer;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.visor.VisorDataTransferObject;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.commandline.cache.CacheCommand.IDLE_VERIFY;
 
 /**
  * Encapsulates result of {@link VerifyBackupPartitionsTaskV2}.
  */
 public class IdleVerifyResultV2 extends VisorDataTransferObject {
     /** */
+    public static final String IDLE_VERIFY_FILE_PREFIX = IDLE_VERIFY + "-";
+
+    /** Time formatter for log file name. */
+    private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH-mm-ss_SSS");
+
+    /** */
     private static final long serialVersionUID = 0L;
 
     /** Counter conflicts. */
@@ -45,18 +59,19 @@ public class IdleVerifyResultV2 extends VisorDataTransferObject {
     private Map<PartitionKeyV2, List<PartitionHashRecordV2>> movingPartitions;
 
     /** Exceptions. */
-    private Map<UUID, Exception> exceptions;
+    private Map<ClusterNode, Exception> exceptions;
 
     /**
      * @param cntrConflicts Counter conflicts.
      * @param hashConflicts Hash conflicts.
      * @param movingPartitions Moving partitions.
+     * @param exceptions Occured exceptions.
      */
     public IdleVerifyResultV2(
         Map<PartitionKeyV2, List<PartitionHashRecordV2>> cntrConflicts,
         Map<PartitionKeyV2, List<PartitionHashRecordV2>> hashConflicts,
         Map<PartitionKeyV2, List<PartitionHashRecordV2>> movingPartitions,
-        Map<UUID, Exception> exceptions
+        Map<ClusterNode, Exception> exceptions
     ) {
         this.cntrConflicts = cntrConflicts;
         this.hashConflicts = hashConflicts;
@@ -125,51 +140,54 @@ public class IdleVerifyResultV2 extends VisorDataTransferObject {
     /**
      * @return Exceptions on nodes.
      */
-    public Map<UUID, Exception> exceptions() {
+    public Map<ClusterNode, Exception> exceptions() {
         return exceptions;
     }
 
     /**
-     * Print formatted result to given printer.
+     * Print formatted result to given printer. If exceptions presented exception messages will be written to log file.
      *
      * @param printer Consumer for handle formatted result.
+     * @return Path to log file if exceptions presented and {@code null} otherwise.
      */
-    public void print(Consumer<String> printer) {
-        if (!hasConflicts())
-            printer.accept("idle_verify check has finished, no conflicts have been found.\n");
-        else {
-            int cntrConflictsSize = counterConflicts().size();
-            int hashConflictsSize = hashConflicts().size();
+    public @Nullable String print(Consumer<String> printer) {
+        print(printer, false);
 
-            printer.accept("idle_verify check has finished, found " + (cntrConflictsSize + hashConflictsSize) +
-                " conflict partitions: [counterConflicts=" + cntrConflictsSize + ", hashConflicts=" +
-                hashConflictsSize + "]\n");
+        if (!F.isEmpty(exceptions)) {
+            File f = new File(IDLE_VERIFY_FILE_PREFIX + LocalDateTime.now().format(TIME_FORMATTER) + ".txt");
 
-            if (!F.isEmpty(counterConflicts())) {
-                printer.accept("Update counter conflicts:\n");
+            try (PrintWriter pw = new PrintWriter(f)) {
+                print(pw::write, true);
 
-                for (Map.Entry<PartitionKeyV2, List<PartitionHashRecordV2>> entry : counterConflicts().entrySet()) {
-                    printer.accept("Conflict partition: " + entry.getKey() + "\n");
+                pw.flush();
 
-                    printer.accept("Partition instances: " + entry.getValue() + "\n");
-                }
+                printer.accept("See log for additional information. " + f.getAbsolutePath() + "\n");
 
-                printer.accept("\n");
+                return f.getAbsolutePath();
             }
+            catch (FileNotFoundException e) {
+                printer.accept("Can't write exceptions to file " + f.getAbsolutePath() + " " + e.getMessage() + "\n");
 
-            if (!F.isEmpty(hashConflicts())) {
-                printer.accept("Hash conflicts:\n");
+                e.printStackTrace();
+            }
+        }
 
-                for (Map.Entry<PartitionKeyV2, List<PartitionHashRecordV2>> entry : hashConflicts().entrySet()) {
-                    printer.accept("Conflict partition: " + entry.getKey() + "\n");
+        return null;
+    }
 
-                    printer.accept("Partition instances: " + entry.getValue() + "\n");
-                }
+    /** */
+    private void print(Consumer<String> printer, boolean printExceptionMessages) {
+        if (!F.isEmpty(exceptions)) {
+            int size = exceptions.size();
 
-                printer.accept("\n");
-            }
+            printer.accept("idle_verify failed on " + size + " node" + (size == 1 ? "" : "s") + ".\n");
         }
 
+        if (!hasConflicts())
+            printer.accept("idle_verify check has finished, no conflicts have been found.\n");
+        else
+            printConflicts(printer);
+
         if (!F.isEmpty(movingPartitions())) {
             printer.accept("Verification was skipped for " + movingPartitions().size() + " MOVING partitions:\n");
 
@@ -185,11 +203,51 @@ public class IdleVerifyResultV2 extends VisorDataTransferObject {
         if (!F.isEmpty(exceptions())) {
             printer.accept("Idle verify failed on nodes:\n");
 
-            for (Map.Entry<UUID, Exception> e : exceptions().entrySet()) {
-                printer.accept("Node ID: " + e.getKey() + "\n");
-                printer.accept("Exception message:" + "\n");
-                printer.accept(e.getValue().getMessage() + "\n");
+            for (Map.Entry<ClusterNode, Exception> e : exceptions().entrySet()) {
+                ClusterNode n = e.getKey();
+
+                printer.accept("Node ID: " + n.id() + " " + n.addresses() + " consistent ID: " + n.consistentId() + "\n");
+
+                if (printExceptionMessages) {
+                    printer.accept("Exception message:" + "\n");
+
+                    printer.accept(e.getValue().getMessage() + "\n");
+                }
+            }
+        }
+    }
+
+    /** */
+    private void printConflicts(Consumer<String> printer) {
+        int cntrConflictsSize = counterConflicts().size();
+        int hashConflictsSize = hashConflicts().size();
+
+        printer.accept("idle_verify check has finished, found " + (cntrConflictsSize + hashConflictsSize) +
+            " conflict partitions: [counterConflicts=" + cntrConflictsSize + ", hashConflicts=" +
+            hashConflictsSize + "]\n");
+
+        if (!F.isEmpty(counterConflicts())) {
+            printer.accept("Update counter conflicts:\n");
+
+            for (Map.Entry<PartitionKeyV2, List<PartitionHashRecordV2>> entry : counterConflicts().entrySet()) {
+                printer.accept("Conflict partition: " + entry.getKey() + "\n");
+
+                printer.accept("Partition instances: " + entry.getValue() + "\n");
+            }
+
+            printer.accept("\n");
+        }
+
+        if (!F.isEmpty(hashConflicts())) {
+            printer.accept("Hash conflicts:\n");
+
+            for (Map.Entry<PartitionKeyV2, List<PartitionHashRecordV2>> entry : hashConflicts().entrySet()) {
+                printer.accept("Conflict partition: " + entry.getKey() + "\n");
+
+                printer.accept("Partition instances: " + entry.getValue() + "\n");
             }
+
+            printer.accept("\n");
         }
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java
new file mode 100644 (file)
index 0000000..d386ec3
--- /dev/null
@@ -0,0 +1,120 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.verify;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Utility class for idle verify command.
+ */
+public class IdleVerifyUtility {
+    /** Cluster not idle message. */
+    public static final String CLUSTER_NOT_IDLE_MSG = "Checkpoint with dirty pages started! Cluster not idle!";
+
+    /**
+     * See {@link IdleVerifyUtility#checkPartitionsPageCrcSum(FilePageStore, CacheGroupContext, int, byte,
+     * AtomicBoolean)}.
+     */
+    public static void checkPartitionsPageCrcSum(
+        @Nullable FilePageStoreManager pageStoreMgr,
+        CacheGroupContext grpCtx,
+        int partId,
+        byte pageType,
+        AtomicBoolean cpFlag
+    ) throws IgniteCheckedException, GridNotIdleException {
+        if (!grpCtx.persistenceEnabled() || pageStoreMgr == null)
+            return;
+
+        FilePageStore pageStore = (FilePageStore)pageStoreMgr.getStore(grpCtx.groupId(), partId);
+
+        checkPartitionsPageCrcSum(pageStore, grpCtx, partId, pageType, cpFlag);
+    }
+
+    /**
+     * Checks CRC sum of pages with {@code pageType} page type stored in partiion with {@code partId} id and assosiated
+     * with cache group. <br/> Method could be invoked only on idle cluster!
+     *
+     * @param pageStore Page store.
+     * @param grpCtx Passed cache group context.
+     * @param partId Partition id.
+     * @param pageType Page type. Possible types {@link PageIdAllocator#FLAG_DATA}, {@link PageIdAllocator#FLAG_IDX}.
+     * @param cpFlag Checkpoint flag for detecting start checkpoint with dirty pages.
+     * @throws IgniteCheckedException If reading page failed.
+     * @throws GridNotIdleException If cluster not idle.
+     */
+    public static void checkPartitionsPageCrcSum(
+        FilePageStore pageStore,
+        CacheGroupContext grpCtx,
+        int partId,
+        byte pageType,
+        AtomicBoolean cpFlag
+    ) throws IgniteCheckedException, GridNotIdleException {
+        assert pageType == PageIdAllocator.FLAG_DATA || pageType == PageIdAllocator.FLAG_IDX : pageType;
+
+        long pageId = PageIdUtils.pageId(partId, pageType, 0);
+
+        ByteBuffer buf = ByteBuffer.allocateDirect(grpCtx.dataRegion().pageMemory().pageSize());
+
+        buf.order(ByteOrder.nativeOrder());
+
+        for (int pageNo = 0; pageNo < pageStore.pages(); pageId++, pageNo++) {
+            buf.clear();
+
+            if (cpFlag.get())
+                throw new GridNotIdleException(CLUSTER_NOT_IDLE_MSG);
+
+            pageStore.read(pageId, buf, true);
+        }
+
+        if (cpFlag.get())
+            throw new GridNotIdleException(CLUSTER_NOT_IDLE_MSG);
+    }
+
+    /**
+     * @param db Shared DB manager.
+     * @return {@code True} if checkpoint is now, {@code False} otherwise.
+     */
+    public static boolean isCheckpointNow(@Nullable IgniteCacheDatabaseSharedManager db) {
+        if (!(db instanceof GridCacheDatabaseSharedManager))
+            return false;
+
+        GridCacheDatabaseSharedManager.CheckpointProgress progress =
+            ((GridCacheDatabaseSharedManager)db).getCheckpointer().currentProgress();
+
+        if (progress == null)
+            return false;
+
+        return progress.started() && !progress.finished();
+    }
+
+    /** */
+    private IdleVerifyUtility() {
+        /* No-op. */
+    }
+}
index c0fd36a..2129d70 100644 (file)
  */
 package org.apache.ignite.internal.processors.cache.verify;
 
-import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.io.PrintWriter;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
@@ -28,6 +28,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.UUID;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -42,7 +43,6 @@ import org.apache.ignite.internal.visor.verify.VisorIdleVerifyDumpTaskArg;
 import org.apache.ignite.internal.visor.verify.VisorIdleVerifyTaskArg;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.resources.LoggerResource;
-import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -77,8 +77,10 @@ public class VerifyBackupPartitionsDumpTask extends ComputeTaskAdapter<VisorIdle
     private IgniteLogger log;
 
     /** {@inheritDoc} */
-    @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(
-        List<ClusterNode> subgrid, VisorIdleVerifyTaskArg arg) throws IgniteException {
+    @Override public @Nullable Map<? extends ComputeJob, ClusterNode> map(
+        List<ClusterNode> subgrid,
+        VisorIdleVerifyTaskArg arg
+    ) throws IgniteException {
         if (arg instanceof VisorIdleVerifyDumpTaskArg)
             taskArg = (VisorIdleVerifyDumpTaskArg)arg;
 
@@ -86,8 +88,7 @@ public class VerifyBackupPartitionsDumpTask extends ComputeTaskAdapter<VisorIdle
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public String reduce(List<ComputeJobResult> results)
-        throws IgniteException {
+    @Override public @Nullable String reduce(List<ComputeJobResult> results) throws IgniteException {
         Map<PartitionKeyV2, List<PartitionHashRecordV2>> clusterHashes = new TreeMap<>(buildPartitionKeyComparator());
 
         for (ComputeJobResult res : results) {
@@ -123,19 +124,11 @@ public class VerifyBackupPartitionsDumpTask extends ComputeTaskAdapter<VisorIdle
     }
 
     /** {@inheritDoc} */
-    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws
-        IgniteException {
-        ComputeJobResultPolicy superRes = super.result(res, rcvd);
-
-        // Deny failover.
-        if (superRes == ComputeJobResultPolicy.FAILOVER) {
-            superRes = ComputeJobResultPolicy.WAIT;
-
-            log.warning("VerifyBackupPartitionsJobV2 failed on node " +
-                "[consistentId=" + res.getNode().consistentId() + "]", res.getException());
-        }
-
-        return superRes;
+    @Override public ComputeJobResultPolicy result(
+        ComputeJobResult res,
+        List<ComputeJobResult> rcvd
+    ) throws IgniteException {
+        return delegate.result(res, rcvd);
     }
 
     /**
@@ -177,46 +170,18 @@ public class VerifyBackupPartitionsDumpTask extends ComputeTaskAdapter<VisorIdle
         IdleVerifyResultV2 conflictRes,
         int skippedRecords
     ) throws IgniteException {
-        File workDir = ignite.configuration().getWorkDirectory() == null
-            ? new File("/tmp")
-            : new File(ignite.configuration().getWorkDirectory());
+        String wd = ignite.configuration().getWorkDirectory();
+
+        File workDir = wd == null ? new File("/tmp") : new File(wd);
 
         File out = new File(workDir, IDLE_DUMP_FILE_PREFIX + LocalDateTime.now().format(TIME_FORMATTER) + ".txt");
 
         ignite.log().info("IdleVerifyDumpTask will write output to " + out.getAbsolutePath());
 
-        try (BufferedWriter writer = new BufferedWriter(new FileWriter(out))) {
-            try {
-
-                writer.write("idle_verify check has finished, found " + partitions.size() + " partitions\n");
+        try (PrintWriter writer = new PrintWriter(new FileWriter(out))) {
+            writeResult(partitions, conflictRes, skippedRecords, writer);
 
-                if (skippedRecords > 0)
-                    writer.write(skippedRecords + " partitions was skipped\n");
-
-                if (!F.isEmpty(partitions)) {
-                    writer.write("Cluster partitions:\n");
-
-                    for (Map.Entry<PartitionKeyV2, List<PartitionHashRecordV2>> entry : partitions.entrySet()) {
-                        writer.write("Partition: " + entry.getKey() + "\n");
-
-                        writer.write("Partition instances: " + entry.getValue() + "\n");
-                    }
-
-                    writer.write("\n\n-----------------------------------\n\n");
-
-                    conflictRes.print(str -> {
-                        try {
-                            writer.write(str);
-                        }
-                        catch (IOException e) {
-                            throw new IgniteException("Failed to write partitions conflict.", e);
-                        }
-                    });
-                }
-            }
-            finally {
-                writer.flush();
-            }
+            writer.flush();
 
             ignite.log().info("IdleVerifyDumpTask successfully written dump to '" + out.getAbsolutePath() + "'");
         }
@@ -229,10 +194,43 @@ public class VerifyBackupPartitionsDumpTask extends ComputeTaskAdapter<VisorIdle
         return out.getAbsolutePath();
     }
 
+    /** */
+    private void writeResult(
+        Map<PartitionKeyV2, List<PartitionHashRecordV2>> partitions,
+        IdleVerifyResultV2 conflictRes,
+        int skippedRecords,
+        PrintWriter writer
+    ) {
+        if (!F.isEmpty(conflictRes.exceptions())) {
+            int size = conflictRes.exceptions().size();
+
+            writer.write("idle_verify failed on " + size + " node" + (size == 1 ? "" : "s") + ".\n");
+        }
+
+        writer.write("idle_verify check has finished, found " + partitions.size() + " partitions\n");
+
+        if (skippedRecords > 0)
+            writer.write(skippedRecords + " partitions was skipped\n");
+
+        if (!F.isEmpty(partitions)) {
+            writer.write("Cluster partitions:\n");
+
+            for (Map.Entry<PartitionKeyV2, List<PartitionHashRecordV2>> entry : partitions.entrySet()) {
+                writer.write("Partition: " + entry.getKey() + "\n");
+
+                writer.write("Partition instances: " + entry.getValue() + "\n");
+            }
+
+            writer.write("\n\n-----------------------------------\n\n");
+
+            conflictRes.print(writer::write);
+        }
+    }
+
     /**
      * @return Comparator for {@link PartitionHashRecordV2}.
      */
-    @NotNull private Comparator<PartitionHashRecordV2> buildRecordComparator() {
+    private Comparator<PartitionHashRecordV2> buildRecordComparator() {
         return (o1, o2) -> {
             int compare = Boolean.compare(o1.isPrimary(), o2.isPrimary());
 
@@ -246,7 +244,7 @@ public class VerifyBackupPartitionsDumpTask extends ComputeTaskAdapter<VisorIdle
     /**
      * @return Comparator for {@link PartitionKeyV2}.
      */
-    @NotNull private Comparator<PartitionKeyV2> buildPartitionKeyComparator() {
+    private Comparator<PartitionKeyV2> buildPartitionKeyComparator() {
         return (o1, o2) -> {
             int compare = Integer.compare(o1.groupId(), o2.groupId());
 
index c30d37b..c5bc8d3 100644 (file)
@@ -25,19 +25,17 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.ComputeJobAdapter;
@@ -54,8 +52,14 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.util.lang.GridIterator;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.visor.verify.CacheFilterEnum;
 import org.apache.ignite.internal.visor.verify.VisorIdleVerifyDumpTaskArg;
@@ -65,6 +69,9 @@ import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.resources.LoggerResource;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA;
+
 /**
  * Task for comparing update counters and checksums between primary and backup partitions of specified caches.
  * <br>
@@ -89,7 +96,9 @@ public class VerifyBackupPartitionsTaskV2 extends ComputeTaskAdapter<VisorIdleVe
 
     /** {@inheritDoc} */
     @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(
-        List<ClusterNode> subgrid, VisorIdleVerifyTaskArg arg) throws IgniteException {
+        List<ClusterNode> subgrid,
+        VisorIdleVerifyTaskArg arg
+    ) throws IgniteException {
         Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
 
         for (ClusterNode node : subgrid)
@@ -99,27 +108,46 @@ public class VerifyBackupPartitionsTaskV2 extends ComputeTaskAdapter<VisorIdleVe
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public IdleVerifyResultV2 reduce(List<ComputeJobResult> results)
-        throws IgniteException {
+    @Nullable @Override public IdleVerifyResultV2 reduce(List<ComputeJobResult> results) throws IgniteException {
         Map<PartitionKeyV2, List<PartitionHashRecordV2>> clusterHashes = new HashMap<>();
-        Map<UUID, Exception> exceptions = new HashMap<>();
-
-        for (ComputeJobResult res : results) {
-            if (res.getException() != null) {
-                exceptions.put(res.getNode().id(), res.getException());
 
-                continue;
-            }
+        Map<ClusterNode, Exception> exceptions = new HashMap<>();
 
-            Map<PartitionKeyV2, PartitionHashRecordV2> nodeHashes = res.getData();
+        reduceResults(results, clusterHashes, exceptions);
 
-            for (Map.Entry<PartitionKeyV2, PartitionHashRecordV2> e : nodeHashes.entrySet()) {
-                List<PartitionHashRecordV2> records = clusterHashes.computeIfAbsent(e.getKey(), k -> new ArrayList<>());
+        return checkConflicts(clusterHashes, exceptions);
+    }
 
-                records.add(e.getValue());
+    /** {@inheritDoc} */
+    @Override public ComputeJobResultPolicy result(
+        ComputeJobResult res,
+        List<ComputeJobResult> rcvd
+    ) throws IgniteException {
+        try {
+            ComputeJobResultPolicy superRes = super.result(res, rcvd);
+
+            // Deny failover.
+            if (superRes == ComputeJobResultPolicy.FAILOVER) {
+                superRes = ComputeJobResultPolicy.WAIT;
+
+                if (log != null) {
+                    log.warning("VerifyBackupPartitionsJobV2 failed on node " +
+                        "[consistentId=" + res.getNode().consistentId() + "]", res.getException());
+                }
             }
+
+            return superRes;
         }
+        catch (IgniteException e) {
+            return ComputeJobResultPolicy.WAIT;
+        }
+    }
 
+    /** */
+    private IdleVerifyResultV2 checkConflicts(
+        Map<PartitionKeyV2, List<PartitionHashRecordV2>> clusterHashes,
+        Map<ClusterNode, Exception> exceptions
+    ) {
         Map<PartitionKeyV2, List<PartitionHashRecordV2>> hashConflicts = new HashMap<>();
 
         Map<PartitionKeyV2, List<PartitionHashRecordV2>> updateCntrConflicts = new HashMap<>();
@@ -158,20 +186,27 @@ public class VerifyBackupPartitionsTaskV2 extends ComputeTaskAdapter<VisorIdleVe
         return new IdleVerifyResultV2(updateCntrConflicts, hashConflicts, movingParts, exceptions);
     }
 
-    /** {@inheritDoc} */
-    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws
-        IgniteException {
-        ComputeJobResultPolicy superRes = super.result(res, rcvd);
+    /** */
+    private void reduceResults(
+        List<ComputeJobResult> results,
+        Map<PartitionKeyV2, List<PartitionHashRecordV2>> clusterHashes,
+        Map<ClusterNode, Exception> exceptions
+    ) {
+        for (ComputeJobResult res : results) {
+            if (res.getException() != null) {
+                exceptions.put(res.getNode(), res.getException());
 
-        // Deny failover.
-        if (superRes == ComputeJobResultPolicy.FAILOVER) {
-            superRes = ComputeJobResultPolicy.WAIT;
+                continue;
+            }
 
-            log.warning("VerifyBackupPartitionsJobV2 failed on node " +
-                "[consistentId=" + res.getNode().consistentId() + "]", res.getException());
-        }
+            Map<PartitionKeyV2, PartitionHashRecordV2> nodeHashes = res.getData();
+
+            for (Map.Entry<PartitionKeyV2, PartitionHashRecordV2> e : nodeHashes.entrySet()) {
+                List<PartitionHashRecordV2> records = clusterHashes.computeIfAbsent(e.getKey(), k -> new ArrayList<>());
 
-        return superRes;
+                records.add(e.getValue());
+            }
+        }
     }
 
     /**
@@ -204,39 +239,102 @@ public class VerifyBackupPartitionsTaskV2 extends ComputeTaskAdapter<VisorIdleVe
 
         /** {@inheritDoc} */
         @Override public Map<PartitionKeyV2, PartitionHashRecordV2> execute() throws IgniteException {
-            Set<Integer> grpIds = new HashSet<>();
+            Set<Integer> grpIds = getGroupIds();
 
-            if (arg.getCaches() != null && !arg.getCaches().isEmpty()) {
-                Set<String> missingCaches = new HashSet<>();
+            completionCntr.set(0);
 
-                for (String cacheName : arg.getCaches()) {
-                    DynamicCacheDescriptor desc = ignite.context().cache().cacheDescriptor(cacheName);
+            AtomicBoolean cpFlag = new AtomicBoolean();
 
-                    if (desc == null || !isCacheMatchFilter(cacheName)) {
-                        missingCaches.add(cacheName);
+            GridCacheDatabaseSharedManager db = null;
 
-                        continue;
+            DbCheckpointListener lsnr = null;
+
+            if (arg.isCheckCrc() &&
+                ignite.context().cache().context().database() instanceof GridCacheDatabaseSharedManager) {
+                db = (GridCacheDatabaseSharedManager)ignite.context().cache().context().database();
+
+                lsnr = new DbCheckpointListener() {
+                    @Override public void onMarkCheckpointBegin(Context ctx) {
+                        /* No-op. */
                     }
 
-                    grpIds.add(desc.groupId());
-                }
+                    @Override public void onCheckpointBegin(Context ctx) {
+                        if (ctx.hasPages())
+                            cpFlag.set(true);
+                    }
+                };
 
-                handlingMissedCaches(missingCaches);
+                db.addCheckpointListener(lsnr);
             }
-            else if (onlySpecificCaches()) {
-                for (DynamicCacheDescriptor desc : ignite.context().cache().cacheDescriptors().values()) {
-                    if (desc.cacheConfiguration().getCacheMode() != CacheMode.LOCAL
-                        && isCacheMatchFilter(desc.cacheName()))
-                        grpIds.add(desc.groupId());
+
+            try {
+                if (arg.isCheckCrc() && IdleVerifyUtility.isCheckpointNow(db))
+                    throw new GridNotIdleException(IdleVerifyUtility.CLUSTER_NOT_IDLE_MSG);
+
+                List<Future<Map<PartitionKeyV2, PartitionHashRecordV2>>> partHashCalcFuts =
+                    calcPartitionHashAsync(grpIds, cpFlag);
+
+                Map<PartitionKeyV2, PartitionHashRecordV2> res = new HashMap<>();
+
+                List<IgniteException> exceptions = new ArrayList<>();
+
+                long lastProgressLogTs = U.currentTimeMillis();
+
+                for (int i = 0; i < partHashCalcFuts.size(); ) {
+                    Future<Map<PartitionKeyV2, PartitionHashRecordV2>> fut = partHashCalcFuts.get(i);
+
+                    try {
+                        Map<PartitionKeyV2, PartitionHashRecordV2> partHash = fut.get(100, TimeUnit.MILLISECONDS);
+
+                        res.putAll(partHash);
+
+                        i++;
+                    }
+                    catch (InterruptedException | ExecutionException e) {
+                        if (e.getCause() instanceof IgniteException && !(e.getCause() instanceof GridNotIdleException)) {
+                            exceptions.add((IgniteException)e.getCause());
+
+                            i++;
+
+                            continue;
+                        }
+
+                        for (int j = i + 1; j < partHashCalcFuts.size(); j++)
+                            partHashCalcFuts.get(j).cancel(false);
+
+                        if (e instanceof InterruptedException)
+                            throw new IgniteInterruptedException((InterruptedException)e);
+                        else
+                            throw new IgniteException(e.getCause());
+                    }
+                    catch (TimeoutException ignored) {
+                        if (U.currentTimeMillis() - lastProgressLogTs > 3 * 60 * 1000L) {
+                            lastProgressLogTs = U.currentTimeMillis();
+
+                            log.warning("idle_verify is still running, processed " + completionCntr.get() + " of " +
+                                partHashCalcFuts.size() + " local partitions");
+                        }
+                    }
                 }
+
+                if (!F.isEmpty(exceptions))
+                    throw new IdleVerifyException(exceptions);
+
+                return res;
             }
-            else
-                grpIds = getCacheGroupIds();
+            finally {
+                if (db != null && lsnr != null)
+                    db.removeCheckpointListener(lsnr);
+            }
+        }
 
+        /** */
+        private List<Future<Map<PartitionKeyV2, PartitionHashRecordV2>>> calcPartitionHashAsync(
+            Set<Integer> grpIds,
+            AtomicBoolean cpFlag
+        ) {
             List<Future<Map<PartitionKeyV2, PartitionHashRecordV2>>> partHashCalcFutures = new ArrayList<>();
 
-            completionCntr.set(0);
-
             for (Integer grpId : grpIds) {
                 CacheGroupContext grpCtx = ignite.context().cache().cacheGroup(grpId);
 
@@ -246,45 +344,43 @@ public class VerifyBackupPartitionsTaskV2 extends ComputeTaskAdapter<VisorIdleVe
                 List<GridDhtLocalPartition> parts = grpCtx.topology().localPartitions();
 
                 for (GridDhtLocalPartition part : parts)
-                    partHashCalcFutures.add(calculatePartitionHashAsync(grpCtx, part));
+                    partHashCalcFutures.add(calculatePartitionHashAsync(grpCtx, part, cpFlag));
             }
 
-            Map<PartitionKeyV2, PartitionHashRecordV2> res = new HashMap<>();
+            return partHashCalcFutures;
+        }
+
+        /** */
+        private Set<Integer> getGroupIds() {
+            Set<Integer> grpIds = new HashSet<>();
 
-            long lastProgressLogTs = U.currentTimeMillis();
+            if (arg.getCaches() != null && !arg.getCaches().isEmpty()) {
+                Set<String> missingCaches = new HashSet<>();
 
-            for (int i = 0; i < partHashCalcFutures.size(); ) {
-                Future<Map<PartitionKeyV2, PartitionHashRecordV2>> fut = partHashCalcFutures.get(i);
+                for (String cacheName : arg.getCaches()) {
+                    DynamicCacheDescriptor desc = ignite.context().cache().cacheDescriptor(cacheName);
 
-                try {
-                    Map<PartitionKeyV2, PartitionHashRecordV2> partHash = fut.get(100, TimeUnit.MILLISECONDS);
+                    if (desc == null || !isCacheMatchFilter(cacheName)) {
+                        missingCaches.add(cacheName);
 
-                    res.putAll(partHash);
+                        continue;
+                    }
 
-                    i++;
-                }
-                catch (InterruptedException | ExecutionException e) {
-                    for (int j = i + 1; j < partHashCalcFutures.size(); j++)
-                        partHashCalcFutures.get(j).cancel(false);
-
-                    if (e instanceof InterruptedException)
-                        throw new IgniteInterruptedException((InterruptedException)e);
-                    else if (e.getCause() instanceof IgniteException)
-                        throw (IgniteException)e.getCause();
-                    else
-                        throw new IgniteException(e.getCause());
+                    grpIds.add(desc.groupId());
                 }
-                catch (TimeoutException ignored) {
-                    if (U.currentTimeMillis() - lastProgressLogTs > 3 * 60 * 1000L) {
-                        lastProgressLogTs = U.currentTimeMillis();
 
-                        log.warning("idle_verify is still running, processed " + completionCntr.get() + " of " +
-                            partHashCalcFutures.size() + " local partitions");
-                    }
+                handlingMissedCaches(missingCaches);
+            }
+            else if (onlySpecificCaches()) {
+                for (DynamicCacheDescriptor desc : ignite.context().cache().cacheDescriptors().values()) {
+                    if (desc.cacheConfiguration().getCacheMode() != LOCAL && isCacheMatchFilter(desc.cacheName()))
+                        grpIds.add(desc.groupId());
                 }
             }
+            else
+                grpIds = getCacheGroupIds();
 
-            return res;
+            return grpIds;
         }
 
         /**
@@ -295,11 +391,12 @@ public class VerifyBackupPartitionsTaskV2 extends ComputeTaskAdapter<VisorIdleVe
 
             Set<Integer> grpIds = new HashSet<>();
 
-            if (arg.excludeCaches() == null || arg.excludeCaches().isEmpty()) {
+            if (F.isEmpty(arg.getExcludeCaches())) {
                 for (CacheGroupContext grp : groups) {
                     if (!grp.systemCache() && !grp.isLocal())
                         grpIds.add(grp.groupId());
                 }
+
                 return grpIds;
             }
 
@@ -315,11 +412,11 @@ public class VerifyBackupPartitionsTaskV2 extends ComputeTaskAdapter<VisorIdleVe
          * @param grp Group.
          */
         private boolean isGrpExcluded(CacheGroupContext grp) {
-            if (arg.excludeCaches().contains(grp.name()))
+            if (arg.getExcludeCaches().contains(grp.name()))
                 return true;
 
             for (GridCacheContext cacheCtx : grp.caches()) {
-                if (arg.excludeCaches().contains(cacheCtx.name()))
+                if (arg.getExcludeCaches().contains(cacheCtx.name()))
                     return true;
             }
 
@@ -327,7 +424,7 @@ public class VerifyBackupPartitionsTaskV2 extends ComputeTaskAdapter<VisorIdleVe
         }
 
         /**
-         *  Checks and throw exception if caches was missed.
+         * Checks and throw exception if caches was missed.
          *
          * @param missingCaches Missing caches.
          */
@@ -335,22 +432,20 @@ public class VerifyBackupPartitionsTaskV2 extends ComputeTaskAdapter<VisorIdleVe
             if (missingCaches.isEmpty())
                 return;
 
-            StringBuilder strBuilder = new StringBuilder("The following caches do not exist");
+            SB strBuilder = new SB("The following caches do not exist");
 
             if (onlySpecificCaches()) {
                 VisorIdleVerifyDumpTaskArg vdta = (VisorIdleVerifyDumpTaskArg)arg;
 
-                strBuilder.append(" or do not match to the given filter [")
-                    .append(vdta.getCacheFilterEnum())
-                    .append("]: ");
+                strBuilder.a(" or do not match to the given filter [").a(vdta.getCacheFilterEnum()).a("]: ");
             }
             else
-                strBuilder.append(": ");
+                strBuilder.a(": ");
 
             for (String name : missingCaches)
-                strBuilder.append(name).append(", ");
+                strBuilder.a(name).a(", ");
 
-            strBuilder.delete(strBuilder.length() - 2, strBuilder.length());
+            strBuilder.d(strBuilder.length() - 2, strBuilder.length());
 
             throw new IgniteException(strBuilder.toString());
         }
@@ -373,9 +468,12 @@ public class VerifyBackupPartitionsTaskV2 extends ComputeTaskAdapter<VisorIdleVe
          */
         private boolean isCacheMatchFilter(String cacheName) {
             if (arg instanceof VisorIdleVerifyDumpTaskArg) {
-                DataStorageConfiguration dsc = ignite.context().config().getDataStorageConfiguration();
+                DataStorageConfiguration dsCfg = ignite.context().config().getDataStorageConfiguration();
+
                 DynamicCacheDescriptor desc = ignite.context().cache().cacheDescriptor(cacheName);
+
                 CacheConfiguration cc = desc.cacheConfiguration();
+
                 VisorIdleVerifyDumpTaskArg vdta = (VisorIdleVerifyDumpTaskArg)arg;
 
                 switch (vdta.getCacheFilterEnum()) {
@@ -383,16 +481,16 @@ public class VerifyBackupPartitionsTaskV2 extends ComputeTaskAdapter<VisorIdleVe
                         return !desc.cacheType().userCache();
 
                     case NOT_PERSISTENT:
-                        return desc.cacheType().userCache() && !GridCacheUtils.isPersistentCache(cc, dsc);
+                        return desc.cacheType().userCache() && !GridCacheUtils.isPersistentCache(cc, dsCfg);
 
                     case PERSISTENT:
-                        return desc.cacheType().userCache() && GridCacheUtils.isPersistentCache(cc, dsc);
+                        return desc.cacheType().userCache() && GridCacheUtils.isPersistentCache(cc, dsCfg);
 
                     case ALL:
                         break;
 
                     default:
-                        assert false: "Illegal cache filter: " + vdta.getCacheFilterEnum();
+                        assert false : "Illegal cache filter: " + vdta.getCacheFilterEnum();
                 }
             }
 
@@ -402,25 +500,25 @@ public class VerifyBackupPartitionsTaskV2 extends ComputeTaskAdapter<VisorIdleVe
         /**
          * @param grpCtx Group context.
          * @param part Local partition.
+         * @param cpFlag Checkpoint flag.
          */
         private Future<Map<PartitionKeyV2, PartitionHashRecordV2>> calculatePartitionHashAsync(
             final CacheGroupContext grpCtx,
-            final GridDhtLocalPartition part
+            final GridDhtLocalPartition part,
+            AtomicBoolean cpFlag
         ) {
-            return ForkJoinPool.commonPool().submit(new Callable<Map<PartitionKeyV2, PartitionHashRecordV2>>() {
-                @Override public Map<PartitionKeyV2, PartitionHashRecordV2> call() throws Exception {
-                    return calculatePartitionHash(grpCtx, part);
-                }
-            });
+            return ForkJoinPool.commonPool().submit(() -> calculatePartitionHash(grpCtx, part, cpFlag));
         }
 
         /**
          * @param grpCtx Group context.
          * @param part Local partition.
+         * @param cpFlag Checkpoint flag.
          */
         private Map<PartitionKeyV2, PartitionHashRecordV2> calculatePartitionHash(
             CacheGroupContext grpCtx,
-            GridDhtLocalPartition part
+            GridDhtLocalPartition part,
+            AtomicBoolean cpFlag
         ) {
             if (!part.reserve())
                 return Collections.emptyMap();
@@ -447,6 +545,9 @@ public class VerifyBackupPartitionsTaskV2 extends ComputeTaskAdapter<VisorIdleVe
 
                 partSize = part.dataStore().fullSize();
 
+                if (arg.isCheckCrc())
+                    checkPartitionCrc(grpCtx, part, cpFlag);
+
                 GridIterator<CacheDataRow> it = grpCtx.offheap().partitionIterator(part.id());
 
                 while (it.hasNextX()) {
@@ -460,7 +561,7 @@ public class VerifyBackupPartitionsTaskV2 extends ComputeTaskAdapter<VisorIdleVe
                 long updateCntrAfter = part.updateCounter();
 
                 if (updateCntrBefore != updateCntrAfter) {
-                    throw new IgniteException("Cluster is not idle: update counter of partition [grpId=" +
+                    throw new GridNotIdleException("Update counter of partition [grpId=" +
                         grpCtx.groupId() + ", partId=" + part.id() + "] changed during hash calculation [before=" +
                         updateCntrBefore + ", after=" + updateCntrAfter + "]");
                 }
@@ -469,18 +570,59 @@ public class VerifyBackupPartitionsTaskV2 extends ComputeTaskAdapter<VisorIdleVe
                 U.error(log, "Can't calculate partition hash [grpId=" + grpCtx.groupId() +
                     ", partId=" + part.id() + "]", e);
 
-                return Collections.emptyMap();
+                throw new IgniteException("Can't calculate partition hash [grpId=" + grpCtx.groupId() +
+                    ", partId=" + part.id() + "]", e);
             }
             finally {
                 part.release();
             }
 
-            PartitionHashRecordV2 partRec = new PartitionHashRecordV2(
-                partKey, isPrimary, consId, partHash, updateCntrBefore, partSize);
+            PartitionHashRecordV2 partRec =
+                new PartitionHashRecordV2(partKey, isPrimary, consId, partHash, updateCntrBefore, partSize);
 
             completionCntr.incrementAndGet();
 
             return Collections.singletonMap(partKey, partRec);
         }
+
+        /**
+         * Checks correct CRC sum for given partition and cache group.
+         *
+         * @param grpCtx Cache group context
+         * @param part partition.
+         * @param cpFlag Checkpoint flag.
+         */
+        private void checkPartitionCrc(CacheGroupContext grpCtx, GridDhtLocalPartition part, AtomicBoolean cpFlag) {
+            if (grpCtx.persistenceEnabled()) {
+                FilePageStore pageStore = null;
+
+                try {
+                    FilePageStoreManager pageStoreMgr =
+                        (FilePageStoreManager)ignite.context().cache().context().pageStore();
+
+                    if (pageStoreMgr == null)
+                        return;
+
+                    pageStore = (FilePageStore)pageStoreMgr.getStore(grpCtx.groupId(), part.id());
+
+                    IdleVerifyUtility.checkPartitionsPageCrcSum(pageStore, grpCtx, part.id(), FLAG_DATA, cpFlag);
+                }
+                catch (GridNotIdleException e) {
+                    throw e;
+                }
+                catch (Exception | AssertionError e) {
+                    if (cpFlag.get())
+                        throw new GridNotIdleException("Checkpoint with dirty pages started! Cluster not idle!", e);
+
+                    String msg = new SB("CRC check of partition: ").a(part.id()).a(", for cache group ")
+                        .a(grpCtx.cacheOrGroupName()).a(" failed.")
+                        .a(pageStore != null ? " file: " + pageStore.getFileAbsolutePath() : "").toString();
+
+                    log.error(msg, e);
+
+                    throw new IgniteException(msg, e);
+                }
+            }
+        }
     }
 }
index 3c836a4..8b92fbf 100644 (file)
@@ -48,9 +48,16 @@ public class VisorIdleVerifyDumpTaskArg extends VisorIdleVerifyTaskArg {
      * @param excludeCaches Caches to exclude.
      * @param skipZeros Skip zeros partitions.
      * @param cacheFilterEnum Cache kind.
+     * @param checkCrc Check partition crc sum.
      */
-    public VisorIdleVerifyDumpTaskArg(Set<String> caches, Set<String> excludeCaches, boolean skipZeros, CacheFilterEnum cacheFilterEnum) {
-        super(caches, excludeCaches);
+    public VisorIdleVerifyDumpTaskArg(
+        Set<String> caches,
+        Set<String> excludeCaches,
+        boolean skipZeros,
+        CacheFilterEnum cacheFilterEnum,
+        boolean checkCrc
+    ) {
+        super(caches, excludeCaches, checkCrc);
         this.skipZeros = skipZeros;
         this.cacheFilterEnum = cacheFilterEnum;
     }
@@ -75,28 +82,65 @@ public class VisorIdleVerifyDumpTaskArg extends VisorIdleVerifyTaskArg {
 
         out.writeBoolean(skipZeros);
 
-        U.writeEnum(out, cacheFilterEnum);
+        /**
+         * Since protocol version 2 we must save class instance new fields to end of output object. It's needs for
+         * support backward compatibility in extended (child) classes.
+         *
+         * TODO: https://issues.apache.org/jira/browse/IGNITE-10932 Will remove in 3.0
+         */
+        if (instanceOfCurrentClass()) {
+            U.writeEnum(out, cacheFilterEnum);
+
+            U.writeCollection(out, getExcludeCaches());
+
+            out.writeBoolean(isCheckCrc());
+        }
     }
 
     /** {@inheritDoc} */
-    @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException, ClassNotFoundException {
+    @Override protected void readExternalData(
+        byte protoVer,
+        ObjectInput in
+    ) throws IOException, ClassNotFoundException {
         super.readExternalData(protoVer, in);
 
         skipZeros = in.readBoolean();
 
-        if (protoVer >= V2)
-            cacheFilterEnum = CacheFilterEnum.fromOrdinal(in.readByte());
-        else
-            cacheFilterEnum = CacheFilterEnum.ALL;
+        /**
+         * Since protocol version 2 we must read class instance new fields from end of input object. It's needs for
+         * support backward compatibility in extended (child) classes.
+         *
+         * TODO: https://issues.apache.org/jira/browse/IGNITE-10932 Will remove in 3.0
+         */
+        if (instanceOfCurrentClass()) {
+            if (protoVer >= V2)
+                cacheFilterEnum = CacheFilterEnum.fromOrdinal(in.readByte());
+            else
+                cacheFilterEnum = CacheFilterEnum.ALL;
+
+            if (protoVer >= V2)
+                excludeCaches(U.readSet(in));
+
+            if (protoVer >= V3)
+                checkCrc(in.readBoolean());
+        }
     }
 
     /** {@inheritDoc} */
     @Override public byte getProtocolVersion() {
-        return V2;
+        return (byte)Math.max(V2, super.getProtocolVersion());
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(VisorIdleVerifyDumpTaskArg.class, this);
     }
+
+    /**
+     * @return {@code True} if current instance is a instance of current class (not a child class) and {@code False} if
+     * current instance is a instance of extented class (i.e child class).
+     */
+    private boolean instanceOfCurrentClass() {
+        return VisorIdleVerifyDumpTaskArg.class == getClass();
+    }
 }
index a8dc697..d4652cc 100644 (file)
@@ -49,8 +49,11 @@ class VisorIdleVerifyJob<ResultT> extends VisorJob<VisorIdleVerifyTaskArg, Resul
      * @param debug Debug.
      * @param taskCls Task class for execution.
      */
-    VisorIdleVerifyJob(VisorIdleVerifyTaskArg arg, boolean debug,
-        Class<? extends ComputeTask<VisorIdleVerifyTaskArg, ResultT>> taskCls) {
+    VisorIdleVerifyJob(
+        VisorIdleVerifyTaskArg arg,
+        boolean debug,
+        Class<? extends ComputeTask<VisorIdleVerifyTaskArg, ResultT>> taskCls
+    ) {
         super(arg, debug);
         this.taskCls = taskCls;
     }
@@ -63,11 +66,7 @@ class VisorIdleVerifyJob<ResultT> extends VisorJob<VisorIdleVerifyTaskArg, Resul
             if (!fut.isDone()) {
                 jobCtx.holdcc();
 
-                fut.listen(new IgniteInClosure<IgniteFuture<ResultT>>() {
-                    @Override public void apply(IgniteFuture<ResultT> f) {
-                        jobCtx.callcc();
-                    }
-                });
+                fut.listen((IgniteInClosure<IgniteFuture<ResultT>>)f -> jobCtx.callcc());
 
                 return null;
             }
index d645fec..2bcf4bf 100644 (file)
@@ -26,7 +26,8 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.visor.VisorDataTransferObject;
 
 /**
- * Arguments for task {@link VisorIdleVerifyTask}
+ * Arguments for task {@link VisorIdleVerifyTask}.
+ * <br/>
  */
 public class VisorIdleVerifyTaskArg extends VisorDataTransferObject {
     /** */
@@ -38,6 +39,9 @@ public class VisorIdleVerifyTaskArg extends VisorDataTransferObject {
     /** Exclude caches or groups. */
     private Set<String> excludeCaches;
 
+    /** Check CRC */
+    private boolean checkCrc;
+
     /**
      * Default constructor.
      */
@@ -48,10 +52,21 @@ public class VisorIdleVerifyTaskArg extends VisorDataTransferObject {
     /**
      * @param caches Caches.
      * @param excludeCaches Exclude caches or group.
+     * @param checkCrc Check CRC sum on stored pages on disk.
      */
-    public VisorIdleVerifyTaskArg(Set<String> caches, Set<String> excludeCaches) {
+    public VisorIdleVerifyTaskArg(Set<String> caches, Set<String> excludeCaches, boolean checkCrc) {
         this.caches = caches;
         this.excludeCaches = excludeCaches;
+        this.checkCrc = checkCrc;
+    }
+
+    /**
+     * @param caches Caches.
+     * @param checkCrc Check CRC sum on stored pages on disk.
+     */
+    public VisorIdleVerifyTaskArg(Set<String> caches, boolean checkCrc) {
+        this.caches = caches;
+        this.checkCrc = checkCrc;
     }
 
     /**
@@ -59,9 +74,12 @@ public class VisorIdleVerifyTaskArg extends VisorDataTransferObject {
      */
     public VisorIdleVerifyTaskArg(Set<String> caches) {
         this.caches = caches;
-        this.excludeCaches = excludeCaches;
     }
 
+    /** */
+    public boolean isCheckCrc() {
+        return checkCrc;
+    }
 
     /**
      * @return Caches.
@@ -73,31 +91,74 @@ public class VisorIdleVerifyTaskArg extends VisorDataTransferObject {
     /**
      * @return Exclude caches or groups.
      */
-    public Set<String> excludeCaches() {
+    public Set<String> getExcludeCaches() {
         return excludeCaches;
     }
 
     /** {@inheritDoc} */
     @Override public byte getProtocolVersion() {
-        return V2;
+        return V3;
     }
 
     /** {@inheritDoc} */
     @Override protected void writeExternalData(ObjectOutput out) throws IOException {
         U.writeCollection(out, caches);
-        U.writeCollection(out, excludeCaches);
+
+        /**
+         * Instance fields since protocol version 2 must be serialized if, and only if class instance isn't child of
+         * current class. Otherwise, these fields must be serialized in child class.
+         *
+         * TODO: https://issues.apache.org/jira/browse/IGNITE-10932 Will remove in 3.0
+         */
+        if (instanceOfCurrentClass()) {
+            U.writeCollection(out, excludeCaches);
+
+            out.writeBoolean(checkCrc);
+        }
     }
 
     /** {@inheritDoc} */
-    @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException, ClassNotFoundException {
+    @Override protected void readExternalData(
+        byte protoVer,
+        ObjectInput in
+    ) throws IOException, ClassNotFoundException {
         caches = U.readSet(in);
 
-        if (protoVer >= V2)
-            excludeCaches = U.readSet(in);
+        /**
+         * Instance fields since protocol version 2 must be deserialized if, and only if class instance isn't child of
+         * current class. Otherwise, these fields must be deserialized in child class.
+         *
+         * TODO: https://issues.apache.org/jira/browse/IGNITE-10932 Will remove in 3.0
+         */
+        if (instanceOfCurrentClass()) {
+            if (protoVer >= V2)
+                excludeCaches = U.readSet(in);
+
+            if (protoVer >= V3)
+                checkCrc = in.readBoolean();
+        }
+    }
+
+    /** */
+    protected void excludeCaches(Set<String> excludeCaches) {
+        this.excludeCaches = excludeCaches;
+    }
+
+    /** */
+    protected void checkCrc(boolean checkCrc) {
+        this.checkCrc = checkCrc;
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(VisorIdleVerifyTaskArg.class, this);
     }
+
+    /**
+     * @return {@code True} if current instance is a instance of current class (not a child class) and {@code False} if
+     * current instance is a instance of extented class (i.e child class).
+     */
+    private boolean instanceOfCurrentClass() {
+        return VisorIdleVerifyTaskArg.class == getClass();
+    }
 }
index b9250ef..6deaf07 100644 (file)
 
 package org.apache.ignite.internal.visor.verify;
 
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.compute.ComputeJobContext;
-import org.apache.ignite.compute.ComputeTaskFuture;
 import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
 import org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsTaskV2;
 import org.apache.ignite.internal.processors.task.GridInternal;
-import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.visor.VisorJob;
 import org.apache.ignite.internal.visor.VisorOneNodeTask;
-import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.resources.JobContextResource;
 
 /**
  * Task to verify checksums of backup partitions.
@@ -40,55 +33,6 @@ public class VisorIdleVerifyTaskV2 extends VisorOneNodeTask<VisorIdleVerifyTaskA
 
     /** {@inheritDoc} */
     @Override protected VisorJob<VisorIdleVerifyTaskArg, IdleVerifyResultV2> job(VisorIdleVerifyTaskArg arg) {
-        return new VisorIdleVerifyJobV2(arg, debug);
-    }
-
-    /**
-     *
-     */
-    private static class VisorIdleVerifyJobV2 extends VisorJob<VisorIdleVerifyTaskArg, IdleVerifyResultV2> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private ComputeTaskFuture<IdleVerifyResultV2> fut;
-
-        /** Auto-inject job context. */
-        @JobContextResource
-        protected transient ComputeJobContext jobCtx;
-
-        /**
-         * @param arg Argument.
-         * @param debug Debug.
-         */
-        private VisorIdleVerifyJobV2(VisorIdleVerifyTaskArg arg, boolean debug) {
-            super(arg, debug);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected IdleVerifyResultV2 run(VisorIdleVerifyTaskArg arg) throws IgniteException {
-            if (fut == null) {
-                fut = ignite.compute().executeAsync(VerifyBackupPartitionsTaskV2.class, arg);
-
-                if (!fut.isDone()) {
-                    jobCtx.holdcc();
-
-                    fut.listen(new IgniteInClosure<IgniteFuture<IdleVerifyResultV2>>() {
-                        @Override public void apply(IgniteFuture<IdleVerifyResultV2> f) {
-                            jobCtx.callcc();
-                        }
-                    });
-
-                    return null;
-                }
-            }
-
-            return fut.get();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(VisorIdleVerifyJobV2.class, this);
-        }
+        return new VisorIdleVerifyJob<>(arg, debug, VerifyBackupPartitionsTaskV2.class);
     }
 }
index 6944e21..6e29340 100644 (file)
@@ -185,7 +185,8 @@ public class PageMemoryTracker implements IgnitePlugin {
                     cleanupPages(fullPageId -> fullPageId.groupId() == grp.groupId());
                 }
 
-                @Override public void onPartitionDestroyed(int grpId, int partId, int tag) throws IgniteCheckedException {
+                @Override
+                public void onPartitionDestroyed(int grpId, int partId, int tag) throws IgniteCheckedException {
                     super.onPartitionDestroyed(grpId, partId, tag);
 
                     cleanupPages(fullPageId -> fullPageId.groupId() == grpId
@@ -212,7 +213,7 @@ public class PageMemoryTracker implements IgnitePlugin {
 
         Mockito.doReturn(pageSize).when(pageMemoryMock).pageSize();
         Mockito.when(pageMemoryMock.realPageSize(Mockito.anyInt())).then(mock -> {
-            int grpId = (Integer) mock.getArguments()[0];
+            int grpId = (Integer)mock.getArguments()[0];
 
             if (gridCtx.encryption().groupKey(grpId) == null)
                 return pageSize;
@@ -247,9 +248,15 @@ public class PageMemoryTracker implements IgnitePlugin {
         freeSlotsCnt = maxPages;
 
         if (cfg.isCheckPagesOnCheckpoint()) {
-            checkpointLsnr = ctx -> {
-                if (!checkPages(false))
-                    throw new IgniteCheckedException("Page memory is inconsistent after applying WAL delta records.");
+            checkpointLsnr = new DbCheckpointListener() {
+                @Override public void onMarkCheckpointBegin(Context ctx) throws IgniteCheckedException {
+                    if (!checkPages(false))
+                        throw new IgniteCheckedException("Page memory is inconsistent after applying WAL delta records.");
+                }
+
+                @Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException {
+                    /* No-op. */
+                }
             };
 
             ((GridCacheDatabaseSharedManager)gridCtx.cache().context().database()).addCheckpointListener(checkpointLsnr);
@@ -298,7 +305,6 @@ public class PageMemoryTracker implements IgnitePlugin {
         return (cfg != null && cfg.isEnabled() && CU.isPersistenceEnabled(ctx.igniteConfiguration()));
     }
 
-
     /**
      * Cleanup pages by predicate.
      *
index e1306f5..833a71e 100644 (file)
@@ -89,6 +89,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
 import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
 import org.apache.ignite.internal.processors.port.GridPortRecord;
 import org.apache.ignite.internal.util.GridBusyLock;
@@ -2131,4 +2132,14 @@ public final class GridTestUtils {
 
         return DriverManager.getConnection(connStr);
     }
+
+    /**
+     * Removes idle_verify log files created in tests.
+     */
+    public static void cleanIdleVerifyLogFiles() {
+        File dir = new File(".");
+
+        for (File f : dir.listFiles(n -> n.getName().startsWith(IdleVerifyResultV2.IDLE_VERIFY_FILE_PREFIX)))
+            f.delete();
+    }
 }
index 30a532f..29d40d2 100644 (file)
@@ -21,6 +21,8 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.io.RandomAccessFile;
+import java.io.Serializable;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -36,6 +38,7 @@ import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.LongAdder;
@@ -151,6 +154,13 @@ public class GridCommandHandlerTest extends GridCommonAbstractTest {
     }
 
     /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        GridTestUtils.cleanIdleVerifyLogFiles();
+    }
+
+    /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         System.setProperty(IGNITE_ENABLE_EXPERIMENTAL_COMMAND, "true");
 
@@ -160,7 +170,7 @@ public class GridCommandHandlerTest extends GridCommonAbstractTest {
 
         sysOut = System.out;
 
-        testOut = new ByteArrayOutputStream(128 * 1024);
+        testOut = new ByteArrayOutputStream(1024 * 1024);
     }
 
     /** {@inheritDoc} */
@@ -1223,6 +1233,83 @@ public class GridCommandHandlerTest extends GridCommonAbstractTest {
             checkExceptionMessageOnReport(unstableId);
     }
 
+    /** */
+    @Test
+    public void testCacheIdleVerifyCrcWithCorruptedPartition() throws Exception {
+        testCacheIdleVerifyWithCorruptedPartition("--cache", "idle_verify", "--check-crc");
+
+        String out = testOut.toString();
+
+        assertTrue(out.contains("idle_verify failed on 1 node."));
+        assertTrue(out.contains("See log for additional information."));
+    }
+
+    /** */
+    @Test
+    public void testCacheIdleVerifyDumpCrcWithCorruptedPartition() throws Exception {
+        testCacheIdleVerifyWithCorruptedPartition("--cache", "idle_verify", "--dump", "--check-crc");
+
+        String parts[] = testOut.toString().split("VisorIdleVerifyDumpTask successfully written output to '");
+
+        assertEquals(2, parts.length);
+
+        String dumpFile = parts[1].split("\\.")[0] + ".txt";
+
+        for (String line : Files.readAllLines(new File(dumpFile).toPath()))
+            System.out.println(line);
+
+        String outputStr = testOut.toString();
+
+        assertTrue(outputStr.contains("idle_verify failed on 1 node."));
+        assertTrue(outputStr.contains("idle_verify check has finished, no conflicts have been found."));
+    }
+
+    /** */
+    private void corruptPartition(File partitionsDir) throws IOException {
+        ThreadLocalRandom rand = ThreadLocalRandom.current();
+
+        for(File partFile : partitionsDir.listFiles((d, n) -> n.startsWith("part"))) {
+            try (RandomAccessFile raf = new RandomAccessFile(partFile, "rw")) {
+                byte[] buf = new byte[1024];
+
+                rand.nextBytes(buf);
+
+                raf.seek(4096 * 2 + 1);
+
+                raf.write(buf);
+            }
+        }
+    }
+
+    /** */
+    private void testCacheIdleVerifyWithCorruptedPartition(String... args) throws Exception {
+        Ignite ignite = startGrids(2);
+
+        ignite.cluster().active(true);
+
+        createCacheAndPreload(ignite, 1000);
+
+        Serializable consistId = ignite.configuration().getConsistentId();
+
+        File partitionsDir = U.resolveWorkDirectory(
+            ignite.configuration().getWorkDirectory(),
+            "db/" + consistId + "/cache-" + DEFAULT_CACHE_NAME,
+            false
+        );
+
+        stopGrid(0);
+
+        corruptPartition(partitionsDir);
+
+        startGrid(0);
+
+        awaitPartitionMapExchange();
+
+        injectTestSystemOut();
+
+        assertEquals(EXIT_CODE_OK, execute(args));
+    }
+
     /**
      * Creates default cache and preload some data entries.
      *
@@ -1253,9 +1340,7 @@ public class GridCommandHandlerTest extends GridCommonAbstractTest {
 
             assertTrue(dumpWithConflicts.contains("Idle verify failed on nodes:"));
 
-            assertTrue(dumpWithConflicts.contains("Node ID: " + unstableNodeId + "\n" +
-                "Exception message:\n" +
-                "Node has left grid: " + unstableNodeId));
+            assertTrue(dumpWithConflicts.contains("Node ID: " + unstableNodeId));
         }
         else
             fail("Should be found dump with conflicts");
@@ -1416,7 +1501,7 @@ public class GridCommandHandlerTest extends GridCommonAbstractTest {
 
         injectTestSystemOut();
 
-        assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify", "--dump", "--excludeCaches", "shared_grp"));
+        assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify", "--dump", "--exclude-caches", "shared_grp"));
 
         Matcher fileNameMatcher = dumpFileNameMatcher();
 
@@ -1461,7 +1546,7 @@ public class GridCommandHandlerTest extends GridCommonAbstractTest {
 
         injectTestSystemOut();
 
-        assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify", "--dump", "--excludeCaches", DEFAULT_CACHE_NAME
+        assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify", "--dump", "--exclude-caches", DEFAULT_CACHE_NAME
             + "," + DEFAULT_CACHE_NAME + "_second"));
 
         Matcher fileNameMatcher = dumpFileNameMatcher();
index 4b68a42..9f2cda1 100644 (file)
@@ -18,8 +18,6 @@ package org.apache.ignite.internal.visor.verify;
 
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -33,6 +31,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
@@ -41,9 +40,6 @@ import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.pagemem.PageIdAllocator;
-import org.apache.ignite.internal.pagemem.PageIdUtils;
-import org.apache.ignite.internal.pagemem.store.PageStore;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
@@ -55,7 +51,12 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.verify.GridNotIdleException;
+import org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility;
 import org.apache.ignite.internal.processors.cache.verify.PartitionKey;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
@@ -76,6 +77,9 @@ import org.h2.engine.Session;
 import org.h2.index.Cursor;
 import org.h2.index.Index;
 
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+
 /**
  * Closure that locally validates indexes of given caches.
  * Validation consists of three checks:
@@ -154,7 +158,7 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
     /**
      *
      */
-    private VisorValidateIndexesJobResult call0() throws Exception {
+    private VisorValidateIndexesJobResult call0() {
         Set<Integer> grpIds = new HashSet<>();
 
         Set<String> missingCaches = new HashSet<>();
@@ -295,31 +299,56 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
     private Map<Integer, IndexIntegrityCheckIssue> integrityCheckIndexesPartitions(Set<Integer> grpIds) {
         List<Future<T2<Integer, IndexIntegrityCheckIssue>>> integrityCheckFutures = new ArrayList<>(grpIds.size());
 
-        for (Integer grpId: grpIds) {
-            final CacheGroupContext grpCtx = ignite.context().cache().cacheGroup(grpId);
+        Map<Integer, IndexIntegrityCheckIssue> integrityCheckResults = new HashMap<>();
 
-            if (grpCtx == null || !grpCtx.persistenceEnabled()) {
-                integrityCheckedIndexes.incrementAndGet();
+        int curFut = 0;
 
-                continue;
+        IgniteCacheDatabaseSharedManager db = ignite.context().cache().context().database();
+
+        DbCheckpointListener lsnr = null;
+
+        try {
+            AtomicBoolean cpFlag = new AtomicBoolean();
+
+            if (db instanceof GridCacheDatabaseSharedManager) {
+                lsnr = new DbCheckpointListener() {
+                    @Override public void onMarkCheckpointBegin(Context ctx) {
+                        /* No-op. */
+                    }
+
+                    @Override public void onCheckpointBegin(Context ctx) {
+                        if (ctx.hasPages())
+                            cpFlag.set(true);
+                    }
+                };
+
+                ((GridCacheDatabaseSharedManager)db).addCheckpointListener(lsnr);
+
+                if (IdleVerifyUtility.isCheckpointNow(db))
+                    throw new GridNotIdleException(IdleVerifyUtility.CLUSTER_NOT_IDLE_MSG);
             }
 
-            Future<T2<Integer, IndexIntegrityCheckIssue>> checkFut =
-                    calcExecutor.submit(new Callable<T2<Integer, IndexIntegrityCheckIssue>>() {
-                        @Override public T2<Integer, IndexIntegrityCheckIssue> call() throws Exception {
-                            IndexIntegrityCheckIssue issue = integrityCheckIndexPartition(grpCtx);
+            for (Integer grpId: grpIds) {
+                final CacheGroupContext grpCtx = ignite.context().cache().cacheGroup(grpId);
 
-                            return new T2<>(grpCtx.groupId(), issue);
-                        }
-                    });
+                if (grpCtx == null || !grpCtx.persistenceEnabled()) {
+                    integrityCheckedIndexes.incrementAndGet();
 
-            integrityCheckFutures.add(checkFut);
-        }
+                    continue;
+                }
 
-        Map<Integer, IndexIntegrityCheckIssue> integrityCheckResults = new HashMap<>();
+                Future<T2<Integer, IndexIntegrityCheckIssue>> checkFut =
+                        calcExecutor.submit(new Callable<T2<Integer, IndexIntegrityCheckIssue>>() {
+                            @Override public T2<Integer, IndexIntegrityCheckIssue> call() throws Exception {
+                                IndexIntegrityCheckIssue issue = integrityCheckIndexPartition(grpCtx, cpFlag);
+
+                                return new T2<>(grpCtx.groupId(), issue);
+                            }
+                        });
+
+                integrityCheckFutures.add(checkFut);
+            }
 
-        int curFut = 0;
-        try {
             for (Future<T2<Integer, IndexIntegrityCheckIssue>> fut : integrityCheckFutures) {
                 T2<Integer, IndexIntegrityCheckIssue> res = fut.get();
 
@@ -333,42 +362,33 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
 
             throw unwrapFutureException(e);
         }
+        finally {
+            if (db instanceof GridCacheDatabaseSharedManager && lsnr != null)
+                ((GridCacheDatabaseSharedManager)db).removeCheckpointListener(lsnr);
+        }
 
         return integrityCheckResults;
     }
 
     /**
      * @param gctx Cache group context.
+     * @param cpFlag Checkpoint status flag.
      */
-    private IndexIntegrityCheckIssue integrityCheckIndexPartition(CacheGroupContext gctx) {
+    private IndexIntegrityCheckIssue integrityCheckIndexPartition(CacheGroupContext gctx, AtomicBoolean cpFlag) {
         GridKernalContext ctx = ignite.context();
         GridCacheSharedContext cctx = ctx.cache().context();
 
         try {
             FilePageStoreManager pageStoreMgr = (FilePageStoreManager)cctx.pageStore();
 
-            if (pageStoreMgr == null)
-                return null;
-
-            int pageSz = gctx.dataRegion().pageMemory().pageSize();
-
-            PageStore pageStore = pageStoreMgr.getStore(gctx.groupId(), PageIdAllocator.INDEX_PARTITION);
-
-            long pageId = PageIdUtils.pageId(PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX, 0);
-
-            ByteBuffer buf = ByteBuffer.allocateDirect(pageSz);
-
-            buf.order(ByteOrder.nativeOrder());
-
-            for (int pageNo = 0; pageNo < pageStore.pages(); pageId++, pageNo++) {
-                buf.clear();
-
-                pageStore.read(pageId, buf, true);
-            }
+            IdleVerifyUtility.checkPartitionsPageCrcSum(pageStoreMgr, gctx, INDEX_PARTITION, FLAG_IDX, cpFlag);
 
             return null;
         }
         catch (Throwable t) {
+            if (cpFlag.get())
+                throw new GridNotIdleException("Checkpoint with dirty pages started! Cluster not idle!", t);
+
             log.error("Integrity check of index partition of cache group " + gctx.cacheOrGroupName() + " failed", t);
 
             return new IndexIntegrityCheckIssue(gctx.cacheOrGroupName(), t);
@@ -728,5 +748,4 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex
         else
             return new IgniteException(e.getCause());
     }
-
 }
index 3555cef..a9168f3 100644 (file)
@@ -254,9 +254,13 @@ public class IgnitePersistentStoreSchemaLoadTest extends GridCommonAbstractTest
         GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)node.context().cache().context().database();
 
         db.addCheckpointListener(new DbCheckpointListener() {
-            @Override public void onCheckpointBegin(Context ctx) {
+            @Override public void onMarkCheckpointBegin(Context ctx) {
                 cnt.countDown();
             }
+
+            @Override public void onCheckpointBegin(Context ctx) {
+                /* No-op. */
+            }
         });
 
         return cnt;