IGNITE-7533 Throttle writing threads according to fsync progress - Fixes #3437.
authordpavlov <dpavlov@gridgain.com>
Wed, 7 Feb 2018 07:37:51 +0000 (10:37 +0300)
committerAlexey Goncharuk <alexey.goncharuk@gmail.com>
Wed, 7 Feb 2018 07:37:51 +0000 (10:37 +0300)
Signed-off-by: Alexey Goncharuk <alexey.goncharuk@gmail.com>
18 files changed:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IntervalBasedMeasurement.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java [new file with mode: 0644]
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteCheckpointDirtyPagesForLowLoadTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteMassLoadSandboxTest.java [new file with mode: 0644]
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/ProgressWatchdog.java [new file with mode: 0644]
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreePageMemoryImplTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/BPlusTreeReuseListPageMemoryImplTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java [new file with mode: 0644]
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IndexStoragePageMemoryImplTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplNoLoadTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsUnitTestSuite.java [new file with mode: 0644]
modules/core/src/test/java/org/apache/ignite/testsuites/IgniteReproducingSuite.java

index 5dc81c5..bd80ec8 100755 (executable)
@@ -55,6 +55,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -129,7 +130,6 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageParti
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
 import org.apache.ignite.internal.processors.port.GridPortRecord;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -156,6 +156,7 @@ import org.apache.ignite.thread.IgniteThread;
 import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentLinkedHashMap;
 
 import static java.nio.file.StandardOpenOption.READ;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE;
@@ -355,7 +356,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     /** Counter for written checkpoint pages. Not null only if checkpoint is running. */
     private volatile AtomicInteger writtenPagesCntr = null;
 
-    /** Number of pages in current checkpoint. */
+    /** Counter for fsynced checkpoint pages. Not null only if checkpoint is running. */
+    private volatile AtomicInteger syncedPagesCntr = null;
+
+    /** Counter for evictted checkpoint pages. Not null only if checkpoint is running. */
+    private volatile AtomicInteger evictedPagesCntr = null;
+
+    /** Number of pages in current checkpoint at the beginning of checkpoint. */
     private volatile int currCheckpointPagesCnt;
 
     /** */
@@ -933,10 +940,18 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             chpBufSize = cacheSize;
         }
 
-        boolean writeThrottlingEnabled = persistenceCfg.isWriteThrottlingEnabled();
+        PageMemoryImpl.ThrottlingPolicy plc = persistenceCfg.isWriteThrottlingEnabled()
+            ? PageMemoryImpl.ThrottlingPolicy.SPEED_BASED
+            : PageMemoryImpl.ThrottlingPolicy.NONE;
+
+        String val = IgniteSystemProperties.getString(IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED);
 
-        if (IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED, false))
-            writeThrottlingEnabled = true;
+        if (val != null) {
+            if ("ratio".equalsIgnoreCase(val))
+                plc = PageMemoryImpl.ThrottlingPolicy.TARGET_RATIO_BASED;
+            else if ("speed".equalsIgnoreCase(val) || Boolean.valueOf(val))
+                plc = PageMemoryImpl.ThrottlingPolicy.SPEED_BASED;
+        }
 
         GridInClosure3X<Long, FullPageId, PageMemoryEx> changeTracker;
 
@@ -974,12 +989,17 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
                     // Only after write we can write page into snapshot.
                     snapshotMgr.flushDirtyPageHandler(fullId, pageBuf, tag);
+
+                    AtomicInteger cntr = evictedPagesCntr;
+
+                    if (cntr != null)
+                        cntr.incrementAndGet();
                 }
             },
             changeTracker,
             this,
             memMetrics,
-            writeThrottlingEnabled
+            plc
         );
 
         memMetrics.pageMemory(pageMem);
@@ -2610,6 +2630,20 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     }
 
     /**
+     * @return Counter for fsynced checkpoint pages. Not null only if checkpoint is running.
+     */
+    public AtomicInteger syncedPagesCounter() {
+        return syncedPagesCntr;
+    }
+
+    /**
+     * @return Counter for evicted pages during current checkpoint. Not null only if checkpoint is running.
+     */
+    public AtomicInteger evictedPagesCntr() {
+        return evictedPagesCntr;
+    }
+
+    /**
      * @return Number of pages in current checkpoint. If checkpoint is not running, returns 0.
      */
     public int currentCheckpointPagesCount() {
@@ -2799,13 +2833,15 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 currCheckpointPagesCnt = chp.pagesSize;
 
                 writtenPagesCntr = new AtomicInteger();
+                syncedPagesCntr = new AtomicInteger();
+                evictedPagesCntr = new AtomicInteger();
 
                 boolean interrupted = true;
 
                 try {
                     if (chp.hasDelta()) {
                         // Identity stores set.
-                        GridConcurrentHashSet<PageStore> updStores = new GridConcurrentHashSet<>();
+                        ConcurrentLinkedHashMap<PageStore, LongAdder> updStores = new ConcurrentLinkedHashMap<>();
 
                         CountDownFuture doneWriteFut = new CountDownFuture(
                             asyncRunner == null ? 1 : chp.cpPages.collectionsSize());
@@ -2867,14 +2903,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                         tracker.onFsyncStart();
 
                         if (!skipSync) {
-                            for (PageStore updStore : updStores) {
+                            for (Map.Entry<PageStore, LongAdder> updStoreEntry : updStores.entrySet()) {
                                 if (shutdownNow) {
                                     chp.progress.cpFinishFut.onDone(new NodeStoppingException("Node is stopping."));
 
                                     return;
                                 }
 
-                                updStore.sync();
+                                updStoreEntry.getKey().sync();
+
+                                syncedPagesCntr.addAndGet(updStoreEntry.getValue().intValue());
                             }
                         }
                     }
@@ -3035,14 +3073,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                     if (grp.isLocal() || !grp.walEnabled())
                         continue;
 
-                    List<GridDhtLocalPartition> locParts = new ArrayList<>();
+                    int locPartsSize = 0;
 
-                    for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions())
-                        locParts.add(part);
+                    for (GridDhtLocalPartition ignored : grp.topology().currentLocalPartitions())
+                        locPartsSize++;
 
-                    Collections.sort(locParts, ASC_PART_COMPARATOR);
-
-                    CacheState state = new CacheState(locParts.size());
+                    CacheState state = new CacheState(locPartsSize);
 
                     for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions()) {
                         state.addPartitionState(
@@ -3184,6 +3220,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 res.add(nextCpPagesCol);
             }
 
+            currCheckpointPagesCnt = pagesNum;
+
             return new IgniteBiTuple<>(res, pagesNum);
         }
 
@@ -3192,6 +3230,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
          */
         private void markCheckpointEnd(Checkpoint chp) throws IgniteCheckedException {
             synchronized (this) {
+                writtenPagesCntr = null;
+                syncedPagesCntr = null;
+                evictedPagesCntr = null;
+
                 for (DataRegion memPlc : dataRegions()) {
                     if (!memPlc.config().isPersistenceEnabled())
                         continue;
@@ -3208,8 +3250,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                         null,
                         CheckpointEntryType.END);
 
-                writtenPagesCntr = null;
-
                 currCheckpointPagesCnt = 0;
             }
 
@@ -3261,7 +3301,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         }
 
         if (persistenceCfg.getCheckpointWriteOrder() == CheckpointWriteOrder.SEQUENTIAL) {
-            Collections.sort(cpPagesList, new Comparator<FullPageId>() {
+            FullPageId[] objects = cpPagesList.toArray(new FullPageId[cpPagesList.size()]);
+
+            Arrays.parallelSort(objects, new Comparator<FullPageId>() {
                 @Override public int compare(FullPageId o1, FullPageId o2) {
                     int cmp = Long.compare(o1.groupId(), o2.groupId());
                     if (cmp != 0)
@@ -3271,6 +3313,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                         PageIdUtils.effectivePageId(o2.pageId()));
                 }
             });
+
+            cpPagesList = Arrays.asList(objects);
         }
 
         int cpThreads = persistenceCfg.getCheckpointThreads();
@@ -3302,7 +3346,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         private Collection<FullPageId> writePageIds;
 
         /** */
-        private GridConcurrentHashSet<PageStore> updStores;
+        private ConcurrentLinkedHashMap<PageStore, LongAdder> updStores;
 
         /** */
         private CountDownFuture doneFut;
@@ -3322,7 +3366,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         private WriteCheckpointPages(
             final CheckpointMetricsTracker tracker,
             final Collection<FullPageId> writePageIds,
-            final GridConcurrentHashSet<PageStore> updStores,
+            final ConcurrentLinkedHashMap<PageStore, LongAdder> updStores,
             final CountDownFuture doneFut,
             final int totalPagesToWrite) {
             this.tracker = tracker;
@@ -3398,7 +3442,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
                         PageStore store = storeMgr.writeInternal(grpId, fullId.pageId(), tmpWriteBuf, tag, false);
 
-                        updStores.add(store);
+                        updStores.computeIfAbsent(store, k -> new LongAdder()).increment();
                     }
                 }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IntervalBasedMeasurement.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IntervalBasedMeasurement.java
new file mode 100644 (file)
index 0000000..db4da6b
--- /dev/null
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.pagemem;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Speed tracker for determine speed of processing based on increments or exact counters value. <br>
+ * Measurement is performed using several intervals (1 current + 3 historical by default). <br>
+ * Too old measurements (intervals) may be dropped if automatic switch mode activated.<br>
+ * To determine speed current measurement is reduced with all historical.<br>
+ *     <br>
+ *  For mode of manual measurements switch it is possible to use
+ *  <br> default ctor
+ *  {@link #IntervalBasedMeasurement()} and methods <br>
+ *     {@link #setCounter(long, long)} (automatically opens interval if not opened) and <br>
+ *     {@link #finishInterval()} to close measurement.<br>
+ *     <br>
+ *  For mode of automatic measurements switch it is possible to use
+ *  <br> parametrized ctor
+ *  {@link #IntervalBasedMeasurement(int, int)} and methods <br>
+ *     {@link #setCounter(long, long)} (automatically opens interval if not opened) or
+ *     {@link #addMeasurementForAverageCalculation(long)} to provide metrics value in addition to event.<br>
+ *     {@link #finishInterval()} is also supported, but not required<br>
+ *     <br>
+ *
+ *  To get results of speed calculation it is possible to use <br>
+ *  Method {@link #getSpeedOpsPerSec(long)} to get current speed (and swicth/open interval if needed). <br>
+ *   or method {@link #getSpeedOpsPerSecReadOnly()} to get current speed without interval modification.<br>
+ *
+ *  If metric value was provided using {@link #addMeasurementForAverageCalculation(long)}
+ *  then method {@link #getAverage()} can be used to get resulting metrics average value during period of time.
+ */
+class IntervalBasedMeasurement {
+    /** Nanos in second. */
+    private static final long NANOS_IN_SECOND = TimeUnit.SECONDS.toNanos(1);
+
+    /** Current Measurement interval atomic reference. */
+    private AtomicReference<MeasurementInterval> measurementIntervalAtomicRef = new AtomicReference<>();
+
+    /** Interval automatic switch nanoseconds. Negative value means no automatic switch. */
+    private final long intervalSwitchNanos;
+
+    /** Max historical measurements to keep. */
+    private final int maxMeasurements;
+
+    /**
+     * Previous (historical) measurements. One thread can write (winner in CAS of {@link
+     * #measurementIntervalAtomicRef}), all other threads may read.
+     */
+    private final ConcurrentLinkedQueue<MeasurementInterval> prevMeasurements = new ConcurrentLinkedQueue<>();
+
+    /**
+     * Default constructor. No automatic switch, 3 historical measurements.
+     */
+    IntervalBasedMeasurement() {
+        this(-1, 3);
+    }
+
+    /**
+     * @param intervalSwitchMs Interval switch milliseconds.
+     * @param maxMeasurements Max historical measurements to keep.
+     */
+    IntervalBasedMeasurement(int intervalSwitchMs, int maxMeasurements) {
+        this.intervalSwitchNanos = intervalSwitchMs > 0 ? intervalSwitchMs * TimeUnit.MILLISECONDS.toNanos(1) : -1;
+        this.maxMeasurements = maxMeasurements;
+    }
+
+    /**
+     * Gets speed, start interval (if not started).
+     *
+     * @param curNanoTime current time nanos.
+     * @return speed in pages per second based on current data.
+     */
+    long getSpeedOpsPerSec(long curNanoTime) {
+        return calcSpeed(interval(curNanoTime), curNanoTime);
+    }
+
+    /**
+     * Gets current speed, does not start measurement.
+     *
+     * @return speed in pages per second based on current data.
+     */
+    long getSpeedOpsPerSecReadOnly() {
+        MeasurementInterval interval = measurementIntervalAtomicRef.get();
+
+        long curNanoTime = System.nanoTime();
+
+        return calcSpeed(interval, curNanoTime);
+    }
+
+    /**
+     * Reduce measurements to calculate average speed.
+     *
+     * @param interval current measurement.
+     * @param curNanoTime current time in nanoseconds.
+     * @return speed in operations per second from historical only measurements.
+     */
+    private long calcSpeed(@Nullable MeasurementInterval interval, long curNanoTime) {
+        long nanosPassed = 0;
+        long opsDone = 0;
+
+        if (!isOutdated(interval, curNanoTime)) {
+            nanosPassed += curNanoTime - interval.startNanoTime;
+            opsDone += interval.cntr.get();
+        }
+
+        for (MeasurementInterval prevMeasurement : prevMeasurements) {
+            if (!isOutdated(prevMeasurement, curNanoTime)) {
+                nanosPassed += prevMeasurement.endNanoTime - prevMeasurement.startNanoTime;
+                opsDone += prevMeasurement.cntr.get();
+            }
+        }
+
+        return nanosPassed <= 0 ? 0 : opsDone * NANOS_IN_SECOND / nanosPassed;
+    }
+
+
+
+    /**
+     * @param interval Measurement to check. {@code null} is always outdated.
+     * @param curNanoTime Current time in nanoseconds.
+     * @return {@code True} if measurement is outdated.
+     */
+    private boolean isOutdated(@Nullable final MeasurementInterval interval, long curNanoTime) {
+        if (interval == null)
+            return true;
+
+        long elapsedNs = curNanoTime - interval.startNanoTime;
+
+        if (elapsedNs <= 0)
+            return true; // interval is started only now
+
+        return (intervalSwitchNanos > 0)
+            && elapsedNs > (maxMeasurements + 1) * intervalSwitchNanos;
+    }
+
+    /**
+     * Gets or creates measurement interval, performs switch to new measurement by timeout.
+     * @param curNanoTime current nano time.
+     * @return interval to use.
+     */
+    @NotNull private MeasurementInterval interval(long curNanoTime) {
+        MeasurementInterval interval;
+
+        do {
+            interval = measurementIntervalAtomicRef.get();
+            if (interval == null) {
+                MeasurementInterval newInterval = new MeasurementInterval(curNanoTime);
+
+                if (measurementIntervalAtomicRef.compareAndSet(null, newInterval))
+                    interval = newInterval;
+                else
+                    continue;
+            }
+
+            if (intervalSwitchNanos > 0 && (curNanoTime - interval.startNanoTime) > intervalSwitchNanos) {
+                MeasurementInterval newInterval = new MeasurementInterval(curNanoTime);
+
+                if (measurementIntervalAtomicRef.compareAndSet(interval, newInterval)) {
+                    interval.endNanoTime = curNanoTime;
+
+                    pushToHistory(interval);
+                }
+            }
+        }
+        while (interval == null);
+
+        return interval;
+    }
+
+    /**
+     * @param interval finished interval to push to history.
+     */
+    private void pushToHistory(MeasurementInterval interval) {
+        prevMeasurements.offer(interval);
+
+        if (prevMeasurements.size() > maxMeasurements)
+            prevMeasurements.remove();
+    }
+
+    /**
+     * Set exact value for counter in current measurement interval, useful only for manually managed measurements.
+     *
+     * @param val new value to set.
+     * @param curNanoTime current nano time.
+     */
+    void setCounter(long val, long curNanoTime) {
+        interval(curNanoTime).cntr.set(val);
+    }
+
+    /**
+     * Manually switch interval to empty (not started measurement).
+     */
+    void finishInterval() {
+        while (true) {
+            MeasurementInterval interval = measurementIntervalAtomicRef.get();
+
+            if (interval == null)
+                return;
+
+            if (measurementIntervalAtomicRef.compareAndSet(interval, null)) {
+                interval.endNanoTime = System.nanoTime();
+
+                pushToHistory(interval);
+
+                return;
+            }
+        }
+    }
+
+    /**
+     * Gets average metric value previously reported by {@link #addMeasurementForAverageCalculation(long)}.
+     * This method may start new interval measurement or switch current.
+     *
+     * @return average metric value.
+     */
+    public long getAverage() {
+        long time = System.nanoTime();
+
+        return avgMeasurementWithHistorical(interval(time), time);
+    }
+
+    /**
+     * Reduce measurements to calculate average value.
+     *
+     * @param interval current measurement. If null only historical is used.
+     * @param curNanoTime current time nanoseconds
+     * @return speed in page per second.
+     */
+    private long avgMeasurementWithHistorical(@Nullable MeasurementInterval interval, long curNanoTime) {
+        long cnt = 0;
+        long sum = 0;
+        if (!isOutdated(interval, curNanoTime)) {
+            cnt += interval.cntr.get();
+            sum += interval.sum.get();
+        }
+        for (MeasurementInterval prevMeasurement : prevMeasurements) {
+            if (!isOutdated(prevMeasurement, curNanoTime)) {
+                cnt += prevMeasurement.cntr.get();
+                sum += prevMeasurement.sum.get();
+            }
+        }
+
+        return cnt <= 0 ? 0 : sum / cnt;
+    }
+
+    /**
+     * Adds measurement to be used for average calculation. Calling this method will later calculate speed of
+     * measurements come. Result can be taken from {@link #getAverage()}.
+     *
+     * @param val value measured now, to be used for average calculation.
+     */
+    void addMeasurementForAverageCalculation(long val) {
+        MeasurementInterval interval = interval(System.nanoTime());
+
+        interval.cntr.incrementAndGet();
+        interval.sum.addAndGet(val);
+    }
+
+    /**
+     * Measurement interval, completed or open.
+     */
+    private static class MeasurementInterval {
+        /** Counter of performed operations, pages. */
+        private AtomicLong cntr = new AtomicLong();
+
+        /** Sum of measured value, used only for average calculation. */
+        private AtomicLong sum = new AtomicLong();
+
+        /** Timestamp in nanoseconds of measurement start. */
+        private final long startNanoTime;
+
+        /** Timestamp in nanoseconds of measurement end. 0 for open (running) measurements.*/
+        private volatile long endNanoTime;
+
+        /**
+         * @param startNanoTime Timestamp of measurement start.
+         */
+        MeasurementInterval(long startNanoTime) {
+            this.startNanoTime = startNanoTime;
+        }
+    }
+}
index 496a7b1..e4c369d 100755 (executable)
@@ -226,10 +226,10 @@ public class PageMemoryImpl implements PageMemoryEx {
     @Nullable private final GridInClosure3X<Long, FullPageId, PageMemoryEx> changeTracker;
 
     /** Pages write throttle. */
-    private PagesWriteThrottle writeThrottle;
+    private PagesWriteThrottlePolicy writeThrottle;
 
-    /** Write throttle enabled flag. */
-    private boolean throttleEnabled;
+    /** Write throttle type. */
+    private ThrottlingPolicy throttlingPlc;
 
     /**  */
     private boolean pageEvictWarned;
@@ -246,7 +246,7 @@ public class PageMemoryImpl implements PageMemoryEx {
      * @param pageSize Page size.
      * @param flushDirtyPage Callback invoked when a dirty page is evicted.
      * @param changeTracker Callback invoked to track changes in pages.
-     * @param throttleEnabled Write throttle enabled flag.
+     * @param throttlingPlc Write throttle enabled and type.
      */
     public PageMemoryImpl(
         DirectMemoryProvider directMemoryProvider,
@@ -257,7 +257,7 @@ public class PageMemoryImpl implements PageMemoryEx {
         @Nullable GridInClosure3X<Long, FullPageId, PageMemoryEx> changeTracker,
         CheckpointLockStateChecker stateChecker,
         DataRegionMetricsImpl memMetrics,
-        boolean throttleEnabled
+        ThrottlingPolicy throttlingPlc
     ) {
         assert ctx != null;
 
@@ -269,7 +269,7 @@ public class PageMemoryImpl implements PageMemoryEx {
         this.flushDirtyPage = flushDirtyPage;
         this.changeTracker = changeTracker;
         this.stateChecker = stateChecker;
-        this.throttleEnabled = throttleEnabled;
+        this.throttlingPlc = throttlingPlc;
 
         storeMgr = ctx.pageStore();
         walMgr = ctx.wal();
@@ -320,7 +320,7 @@ public class PageMemoryImpl implements PageMemoryEx {
 
             totalAllocated += reg.size();
 
-            segments[i] = new Segment(i, regions.get(i), checkpointPool.pages() / segments.length, throttleEnabled);
+            segments[i] = new Segment(i, regions.get(i), checkpointPool.pages() / segments.length, throttlingPlc);
 
             pages += segments[i].pages();
             totalTblSize += segments[i].tableSize();
@@ -344,11 +344,17 @@ public class PageMemoryImpl implements PageMemoryEx {
             log.error("Write throttle can't start. Unexpected class of database manager: " +
                 ctx.database().getClass());
 
-            throttleEnabled = false;
+            throttlingPlc = ThrottlingPolicy.NONE;
         }
 
-        if (throttleEnabled)
-            writeThrottle = new PagesWriteThrottle(this, (GridCacheDatabaseSharedManager)ctx.database());
+        if (isThrottlingEnabled()) {
+            GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ctx.database();
+
+            if (throttlingPlc == ThrottlingPolicy.SPEED_BASED)
+                writeThrottle = new PagesWriteSpeedBasedThrottle(this, db, log);
+            else if(throttlingPlc == ThrottlingPolicy.TARGET_RATIO_BASED)
+                writeThrottle = new PagesWriteThrottle(this, db);
+        }
     }
 
     /** {@inheritDoc} */
@@ -431,6 +437,9 @@ public class PageMemoryImpl implements PageMemoryEx {
 
         assert ctx.database().checkpointLockIsHeldByThread();
 
+        if (isThrottlingEnabled())
+            writeThrottle.onMarkDirty(false);
+
         long pageId = storeMgr.allocatePage(cacheId, partId, flags);
 
         assert PageIdUtils.pageIndex(pageId) > 0; //it's crucial for tracking pages (zero page is super one)
@@ -876,6 +885,32 @@ public class PageMemoryImpl implements PageMemoryEx {
         return false;
     }
 
+    /**
+     * @return Max dirty ratio from the segments.
+     */
+    double getDirtyPagesRatio() {
+        double res = 0;
+
+        for (Segment segment : segments) {
+            res = Math.max(res, segment.getDirtyPagesRatio());
+        }
+
+        return res;
+    }
+
+    /**
+     * @return Total pages can be placed in all segments.
+     */
+    public long totalPages() {
+        long res = 0;
+
+        for (Segment segment : segments) {
+            res += segment.pages();
+        }
+
+        return res;
+    }
+
     /** {@inheritDoc} */
     @Override public GridMultiCollectionWrapper<FullPageId> beginCheckpoint() throws IgniteException {
         if (segments == null)
@@ -896,9 +931,19 @@ public class PageMemoryImpl implements PageMemoryEx {
 
         memMetrics.resetDirtyPages();
 
+        if (isThrottlingEnabled())
+            writeThrottle.onBeginCheckpoint();
+
         return new GridMultiCollectionWrapper<>(collections);
     }
 
+    /**
+     * @return {@code True} if throttling is enabled.
+     */
+    private boolean isThrottlingEnabled() {
+        return throttlingPlc != ThrottlingPolicy.NONE;
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings({"unchecked", "TooBroadScope"})
     @Override public void finishCheckpoint() {
@@ -908,7 +953,7 @@ public class PageMemoryImpl implements PageMemoryEx {
         for (Segment seg : segments)
             seg.segCheckpointPages = null;
 
-        if (throttleEnabled)
+        if (isThrottlingEnabled())
             writeThrottle.onFinishCheckpoint();
     }
 
@@ -1327,7 +1372,11 @@ public class PageMemoryImpl implements PageMemoryEx {
 
     /**
      * @param page Page pointer.
+     * @param fullId full page ID.
+     * @param walPlc
      * @param walPlc Full page WAL record policy.
+     * @param markDirty set dirty flag to page.
+     * @param restore
      */
     private void writeUnlockPage(
         long page,
@@ -1336,13 +1385,13 @@ public class PageMemoryImpl implements PageMemoryEx {
         boolean markDirty,
         boolean restore
     ) {
-        boolean dirty = isDirty(page);
+        boolean wasDirty = isDirty(page);
 
         //if page is for restore, we shouldn't mark it as changed
-        if (!restore && markDirty && !dirty && changeTracker != null)
+        if (!restore && markDirty && !wasDirty && changeTracker != null)
             changeTracker.apply(page, fullId, this);
 
-        boolean pageWalRec = markDirty && walPlc != FALSE && (walPlc == TRUE || !dirty);
+        boolean pageWalRec = markDirty && walPlc != FALSE && (walPlc == TRUE || !wasDirty);
 
         assert GridUnsafe.getInt(page + PAGE_OVERHEAD + 4) == 0; //TODO GG-11480
 
@@ -1360,7 +1409,7 @@ public class PageMemoryImpl implements PageMemoryEx {
         try {
             rwLock.writeUnlock(page + PAGE_LOCK_OFFSET, PageIdUtils.tag(pageId));
 
-            if (throttleEnabled && !restore && markDirty)
+            if (isThrottlingEnabled() && !restore && markDirty && !wasDirty)
                 writeThrottle.onMarkDirty(isInCheckpoint(fullId));
         }
         catch (AssertionError ex) {
@@ -1449,12 +1498,13 @@ public class PageMemoryImpl implements PageMemoryEx {
     /**
      * This method must be called in synchronized context.
      *
+     * @param pageId full page ID.
      * @param absPtr Absolute pointer.
      * @param dirty {@code True} dirty flag.
      * @param forceAdd If this flag is {@code true}, then the page will be added to the dirty set regardless whether the
      * old flag was dirty or not.
      */
-    void setDirty(FullPageId pageId, long absPtr, boolean dirty, boolean forceAdd) {
+    private void setDirty(FullPageId pageId, long absPtr, boolean dirty, boolean forceAdd) {
         boolean wasDirty = PageHeader.dirty(absPtr, dirty);
 
         if (dirty) {
@@ -1733,9 +1783,9 @@ public class PageMemoryImpl implements PageMemoryEx {
 
         /**
          * @param region Memory region.
-         * @param throttlingEnabled Write throttling enabled flag.
+         * @param throttlingPlc policy determine if write throttling enabled and its type.
          */
-        private Segment(int idx, DirectMemoryRegion region, int cpPoolPages, boolean throttlingEnabled) {
+        private Segment(int idx, DirectMemoryRegion region, int cpPoolPages, ThrottlingPolicy throttlingPlc) {
             long totalMemory = region.size();
 
             int pages = (int)(totalMemory / sysPageSize);
@@ -1752,7 +1802,9 @@ public class PageMemoryImpl implements PageMemoryEx {
 
             pool = new PagePool(idx, poolRegion, null);
 
-            maxDirtyPages = throttlingEnabled ? pool.pages() * 3 / 4 : Math.min(pool.pages() * 2 / 3, cpPoolPages);
+            maxDirtyPages = throttlingPlc != ThrottlingPolicy.NONE
+                ? pool.pages() * 3 / 4
+                : Math.min(pool.pages() * 2 / 3, cpPoolPages);
         }
 
         /**
@@ -1780,7 +1832,14 @@ public class PageMemoryImpl implements PageMemoryEx {
          * @param dirtyRatioThreshold Throttle threshold.
          */
         private boolean shouldThrottle(double dirtyRatioThreshold) {
-            return ((double)dirtyPages.size()) / pages() > dirtyRatioThreshold;
+            return getDirtyPagesRatio() > dirtyRatioThreshold;
+        }
+
+        /**
+         * @return dirtyRatio to be compared with Throttle threshold.
+         */
+        private double getDirtyPagesRatio() {
+            return ((double)dirtyPages.size()) / pages();
         }
 
         /**
@@ -2605,4 +2664,13 @@ public class PageMemoryImpl implements PageMemoryEx {
             }
         }
     }
+
+    /**
+     * Throttling enabled and its type enum.
+     */
+    public enum ThrottlingPolicy {
+        /** Not throttled. */NONE,
+        /** Target ratio based: CP progress is used as border. */ TARGET_RATIO_BASED,
+        /** Speed based. CP writting speed and estimated ideal speed are used as border */ SPEED_BASED
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
new file mode 100644 (file)
index 0000000..cb19eca
--- /dev/null
@@ -0,0 +1,519 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.pagemem;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+
+/**
+ * Throttles threads that generate dirty pages during ongoing checkpoint.
+ * Designed to avoid zero dropdowns that can happen if checkpoint buffer is overflowed.
+ * Uses average checkpoint write speed and moment speed of marking pages as dirty.
+ */
+public class PagesWriteSpeedBasedThrottle implements PagesWriteThrottlePolicy {
+    /** Maximum dirty pages in region. */
+    private static final double MAX_DIRTY_PAGES = 0.75;
+
+    /** Page memory. */
+    private final PageMemoryImpl pageMemory;
+
+    /** Database manager. */
+    private final GridCacheDatabaseSharedManager dbSharedMgr;
+
+    /** Starting throttle time. Limits write speed to 1000 MB/s. */
+    private static final long STARTING_THROTTLE_NANOS = 4000;
+
+    /** Backoff ratio. Each next park will be this times longer. */
+    private static final double BACKOFF_RATIO = 1.05;
+
+    /** Percent of dirty pages which will not cause throttling. */
+    private static final double MIN_RATIO_NO_THROTTLE = 0.03;
+
+    /** Exponential backoff counter. */
+    private final AtomicInteger exponentialBackoffCntr = new AtomicInteger(0);
+
+    /** Counter of written pages from checkpoint. Value is saved here for detecting checkpoint start. */
+    private final AtomicInteger lastObservedWritten = new AtomicInteger(0);
+
+    /**
+     * Dirty pages ratio was observed at checkpoint start (here start is moment when first page was actually saved to
+     * store). This ratio is excluded from throttling.
+     */
+    private volatile double initDirtyRatioAtCpBegin = MIN_RATIO_NO_THROTTLE;
+
+    /**
+     * Target (maximum) dirty pages ratio, after which throttling will start using
+     * {@link #getParkTime(double, long, int, int, long, long)}.
+     */
+    private volatile double targetDirtyRatio;
+
+    /**
+     * Current dirty pages ratio (percent of dirty pages in most used segment), negative value means no cp is running.
+     */
+    private volatile double currDirtyRatio;
+
+    /** Speed average checkpoint write speed. Current and 3 past checkpoints used. Pages/second. */
+    private final IntervalBasedMeasurement speedCpWrite = new IntervalBasedMeasurement();
+
+    /** Last estimated speed for marking all clear pages as dirty till the end of checkpoint. */
+    private volatile long speedForMarkAll;
+
+    /** Threads set. Contains identifiers of all threads which were marking pages for current checkpoint. */
+    private final GridConcurrentHashSet<Long> threadIds = new GridConcurrentHashSet<>();
+
+    /**
+     * Used for calculating speed of marking pages dirty.
+     * Value from past 750-1000 millis only.
+     * {@link IntervalBasedMeasurement#getSpeedOpsPerSec(long)} returns pages marked/second.
+     * {@link IntervalBasedMeasurement#getAverage()} returns average throttle time.
+     * */
+    private final IntervalBasedMeasurement speedMarkAndAvgParkTime = new IntervalBasedMeasurement(250, 3);
+
+    /** Total pages which is possible to store in page memory. */
+    private long totalPages;
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Previous warning time, nanos. */
+    private AtomicLong prevWarnTime = new AtomicLong();
+
+    /** Warning min delay nanoseconds. */
+    private static final long WARN_MIN_DELAY_NS = TimeUnit.SECONDS.toNanos(10);
+
+    /** Warning threshold: minimal level of pressure that causes warning messages to log. */
+    static final double WARN_THRESHOLD = 0.2;
+
+    /**
+     * @param pageMemory Page memory.
+     * @param dbSharedMgr Database manager.
+     * @param log Logger.
+     */
+    public PagesWriteSpeedBasedThrottle(PageMemoryImpl pageMemory,
+        GridCacheDatabaseSharedManager dbSharedMgr, IgniteLogger log) {
+        this.pageMemory = pageMemory;
+        this.dbSharedMgr = dbSharedMgr;
+        totalPages = pageMemory.totalPages();
+        this.log = log;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMarkDirty(boolean isPageInCheckpoint) {
+        assert dbSharedMgr.checkpointLockIsHeldByThread();
+
+        AtomicInteger writtenPagesCntr = dbSharedMgr.writtenPagesCounter();
+
+        if (writtenPagesCntr == null) {
+            speedForMarkAll = 0;
+            targetDirtyRatio = -1;
+            currDirtyRatio = -1;
+
+            return; // Don't throttle if checkpoint is not running.
+        }
+
+        int cpWrittenPages = writtenPagesCntr.get();
+
+        long fullyCompletedPages = (cpWrittenPages + cpSyncedPages()) / 2; // written & sync'ed
+
+        long curNanoTime = System.nanoTime();
+
+        speedCpWrite.setCounter(fullyCompletedPages, curNanoTime);
+
+        long markDirtySpeed = speedMarkAndAvgParkTime.getSpeedOpsPerSec(curNanoTime);
+
+        long curCpWriteSpeed = speedCpWrite.getSpeedOpsPerSec(curNanoTime);
+
+        threadIds.add(Thread.currentThread().getId());
+
+        ThrottleMode level = ThrottleMode.NO; //should apply delay (throttling) for current page modification
+
+        if (isPageInCheckpoint) {
+            int checkpointBufLimit = pageMemory.checkpointBufferPagesSize() * 2 / 3;
+
+            if (pageMemory.checkpointBufferPagesCount() > checkpointBufLimit)
+                level = ThrottleMode.EXPONENTIAL;
+        }
+
+        long throttleParkTimeNs = 0;
+
+        if (level == ThrottleMode.NO) {
+            int nThreads = threadIds.size();
+
+            int cpTotalPages = cpTotalPages();
+
+            if (cpTotalPages == 0) {
+                boolean throttleByCpSpeed = curCpWriteSpeed > 0 && markDirtySpeed > curCpWriteSpeed;
+
+                if (throttleByCpSpeed) {
+                    throttleParkTimeNs = calcDelayTime(curCpWriteSpeed, nThreads, 1);
+
+                    level = ThrottleMode.LIMITED;
+                }
+            }
+            else {
+                double dirtyPagesRatio = pageMemory.getDirtyPagesRatio();
+
+                currDirtyRatio = dirtyPagesRatio;
+
+                detectCpPagesWriteStart(cpWrittenPages, dirtyPagesRatio);
+
+                if (dirtyPagesRatio >= MAX_DIRTY_PAGES)
+                    level = ThrottleMode.NO; // too late to throttle, will wait on safe to update instead.
+                else {
+                    int notEvictedPagesTotal = cpTotalPages - cpEvictedPages();
+
+                    throttleParkTimeNs = getParkTime(dirtyPagesRatio,
+                        fullyCompletedPages,
+                        notEvictedPagesTotal < 0 ? 0 : notEvictedPagesTotal,
+                        nThreads,
+                        markDirtySpeed,
+                        curCpWriteSpeed);
+
+                    level = ThrottleMode.LIMITED;
+                }
+            }
+        }
+
+        if (level == ThrottleMode.NO) {
+            exponentialBackoffCntr.set(0);
+
+            throttleParkTimeNs = 0;
+        }
+        else if (level == ThrottleMode.EXPONENTIAL) {
+            int exponent = exponentialBackoffCntr.getAndIncrement();
+
+            throttleParkTimeNs = (long)(STARTING_THROTTLE_NANOS * Math.pow(BACKOFF_RATIO, exponent));
+        }
+
+        if (throttleParkTimeNs > 0) {
+            recurrentLogIfNeed();
+
+            doPark(throttleParkTimeNs);
+        }
+
+        speedMarkAndAvgParkTime.addMeasurementForAverageCalculation(throttleParkTimeNs);
+    }
+
+    /**
+     * Disables the current thread for thread scheduling purposes. May be overriden by subclasses for tests
+     *
+     * @param throttleParkTimeNs the maximum number of nanoseconds to wait
+     */
+    protected void doPark(long throttleParkTimeNs) {
+        LockSupport.parkNanos(throttleParkTimeNs);
+    }
+
+    /**
+     * @return number of written pages.
+     */
+    private int cpWrittenPages() {
+        AtomicInteger writtenPagesCntr = dbSharedMgr.writtenPagesCounter();
+
+        return writtenPagesCntr == null ? 0 : writtenPagesCntr.get();
+    }
+
+    /**
+     * @return Number of pages in current checkpoint.
+     */
+    private int cpTotalPages() {
+        return dbSharedMgr.currentCheckpointPagesCount();
+    }
+
+    /**
+     * @return  Counter for fsynced checkpoint pages.
+     */
+    private int cpSyncedPages() {
+        AtomicInteger syncedPagesCntr = dbSharedMgr.syncedPagesCounter();
+
+        return syncedPagesCntr == null ? 0 : syncedPagesCntr.get();
+    }
+
+    /**
+     * @return number of evicted pages.
+     */
+    private int cpEvictedPages() {
+        AtomicInteger evictedPagesCntr = dbSharedMgr.evictedPagesCntr();
+
+        return evictedPagesCntr == null ? 0 : evictedPagesCntr.get();
+    }
+
+    /**
+     * Prints warning to log if throttling is occurred and requires markable amount of time.
+     */
+    private void recurrentLogIfNeed() {
+        long prevWarningNs = prevWarnTime.get();
+        long curNs = System.nanoTime();
+
+        if (prevWarningNs != 0 && (curNs - prevWarningNs) <= WARN_MIN_DELAY_NS)
+            return;
+
+        double weight = throttleWeight();
+        if (weight <= WARN_THRESHOLD)
+            return;
+
+        if (prevWarnTime.compareAndSet(prevWarningNs, curNs)) {
+            String msg = String.format("Throttling is applied to page modifications " +
+                    "[percentOfPartTime=%.2f, markDirty=%d pages/sec, checkpointWrite=%d pages/sec, " +
+                    "estIdealMarkDirty=%d pages/sec, curDirty=%.2f, maxDirty=%.2f, avgParkTime=%d ns, " +
+                    "pages: (total=%d, evicted=%d, written=%d, synced=%d, cpBufUsed=%d, cpBufTotal=%d)]",
+                weight, getMarkDirtySpeed(), getCpWriteSpeed(),
+                getLastEstimatedSpeedForMarkAll(), getCurrDirtyRatio(), getTargetDirtyRatio(), throttleParkTime(),
+                cpTotalPages(), cpEvictedPages(), cpWrittenPages(), cpSyncedPages(),
+                pageMemory.checkpointBufferPagesCount(), pageMemory.checkpointBufferPagesSize());
+
+            log.info(msg);
+        }
+    }
+
+    /**
+     * @param dirtyPagesRatio actual percent of dirty pages.
+     * @param fullyCompletedPages written & fsynced pages count.
+     * @param cpTotalPages total checkpoint scope.
+     * @param nThreads number of threads providing data during current checkpoint.
+     * @param markDirtySpeed registered mark dirty speed, pages/sec.
+     * @param curCpWriteSpeed average checkpoint write speed, pages/sec.
+     * @return time in nanoseconds to part or 0 if throttling is not required.
+     */
+    long getParkTime(
+        double dirtyPagesRatio,
+        long fullyCompletedPages,
+        int cpTotalPages,
+        int nThreads,
+        long markDirtySpeed,
+        long curCpWriteSpeed) {
+
+        long speedForMarkAll = calcSpeedToMarkAllSpaceTillEndOfCp(dirtyPagesRatio,
+            fullyCompletedPages,
+            curCpWriteSpeed,
+            cpTotalPages);
+
+        double targetDirtyRatio = calcTargetDirtyRatio(fullyCompletedPages, cpTotalPages);
+
+        this.speedForMarkAll = speedForMarkAll; //publish for metrics
+        this.targetDirtyRatio = targetDirtyRatio; //publish for metrics
+
+        boolean lowSpaceLeft = dirtyPagesRatio > targetDirtyRatio && (dirtyPagesRatio + 0.05 > MAX_DIRTY_PAGES);
+        int slowdown = lowSpaceLeft ? 3 : 1;
+
+        double multiplierForSpeedForMarkAll = lowSpaceLeft
+            ? 0.8
+            : 1.0;
+
+        boolean markingTooFast = speedForMarkAll > 0 && markDirtySpeed > multiplierForSpeedForMarkAll * speedForMarkAll;
+        boolean throttleBySizeAndMarkSpeed = dirtyPagesRatio > targetDirtyRatio && markingTooFast;
+
+        //for case of speedForMarkAll >> markDirtySpeed, allow write little bit faster than CP average
+        double allowWriteFasterThanCp = (speedForMarkAll > 0 && markDirtySpeed > 0 && speedForMarkAll > markDirtySpeed)
+            ? (0.1 * speedForMarkAll / markDirtySpeed)
+            : (dirtyPagesRatio > targetDirtyRatio ? 0.0 : 0.1);
+
+        double fasterThanCpWriteSpeed = lowSpaceLeft
+            ? 1.0
+            : 1.0 + allowWriteFasterThanCp;
+        boolean throttleByCpSpeed = curCpWriteSpeed > 0 && markDirtySpeed > (fasterThanCpWriteSpeed * curCpWriteSpeed);
+
+        long delayByCpWrite = throttleByCpSpeed ? calcDelayTime(curCpWriteSpeed, nThreads, slowdown) : 0;
+        long delayByMarkAllWrite = throttleBySizeAndMarkSpeed ? calcDelayTime(speedForMarkAll, nThreads, slowdown) : 0;
+        return Math.max(delayByCpWrite, delayByMarkAllWrite);
+    }
+
+    /**
+     * @param dirtyPagesRatio current percent of dirty pages.
+     * @param fullyCompletedPages count of written and sync'ed pages
+     * @param curCpWriteSpeed pages/second checkpoint write speed. 0 speed means 'no data'.
+     * @param cpTotalPages total pages in checkpoint.
+     * @return pages/second to mark to mark all clean pages as dirty till the end of checkpoint. 0 speed means 'no
+     * data'.
+     */
+    private long calcSpeedToMarkAllSpaceTillEndOfCp(double dirtyPagesRatio,
+        long fullyCompletedPages,
+        long curCpWriteSpeed,
+        int cpTotalPages) {
+
+        if (curCpWriteSpeed == 0)
+            return 0;
+
+        if (cpTotalPages <= 0)
+            return 0;
+
+        if (dirtyPagesRatio >= MAX_DIRTY_PAGES)
+            return 0;
+
+        double remainedClear = (MAX_DIRTY_PAGES - dirtyPagesRatio) * totalPages;
+
+        double timeRemainedSeconds = 1.0 * (cpTotalPages - fullyCompletedPages) / curCpWriteSpeed;
+
+        return (long)(remainedClear / timeRemainedSeconds);
+    }
+
+    /**
+     * @param fullyCompletedPages number of completed.
+     * @param cpTotalPages Total amount of pages under checkpoint.
+     * @return size-based calculation of target ratio.
+     */
+    private double calcTargetDirtyRatio(long fullyCompletedPages, int cpTotalPages) {
+        double cpProgress = ((double)fullyCompletedPages) / cpTotalPages;
+
+        // Starting with initialDirtyRatioAtCpBegin to avoid throttle right after checkpoint start
+        double constStart = initDirtyRatioAtCpBegin;
+
+        double throttleTotalWeight = 1.0 - constStart;
+
+        // .75 is maximum ratio of dirty pages
+        return (cpProgress * throttleTotalWeight + constStart) * MAX_DIRTY_PAGES;
+    }
+
+    /**
+     * @param baseSpeed speed to slow down.
+     * @param nThreads operating threads.
+     * @param coefficient how much it is needed to slowdown base speed. 1.0 means delay to get exact base speed.
+     * @return sleep time in nanoseconds.
+     */
+    private long calcDelayTime(long baseSpeed, int nThreads, double coefficient) {
+        if (coefficient <= 0.0)
+            return 0;
+
+        if (baseSpeed <= 0)
+            return 0;
+
+        long updTimeNsForOnePage = TimeUnit.SECONDS.toNanos(1) * nThreads / (baseSpeed);
+
+        return (long)(coefficient * updTimeNsForOnePage);
+    }
+
+    /**
+     * @param cpWrittenPages current counter of written pages.
+     * @param dirtyPagesRatio current percent of dirty pages.
+     */
+    private void detectCpPagesWriteStart(int cpWrittenPages, double dirtyPagesRatio) {
+        if (cpWrittenPages > 0 && lastObservedWritten.compareAndSet(0, cpWrittenPages)) {
+            double newMinRatio = dirtyPagesRatio;
+
+            if (newMinRatio < MIN_RATIO_NO_THROTTLE)
+                newMinRatio = MIN_RATIO_NO_THROTTLE;
+
+            if (newMinRatio > 1)
+                newMinRatio = 1;
+
+            //for slow cp is completed now, drop previous dirty page percent
+            initDirtyRatioAtCpBegin = newMinRatio;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onBeginCheckpoint() {
+        speedCpWrite.setCounter(0L, System.nanoTime());
+
+        initDirtyRatioAtCpBegin = MIN_RATIO_NO_THROTTLE;
+
+        lastObservedWritten.set(0);
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public void onFinishCheckpoint() {
+        exponentialBackoffCntr.set(0);
+
+        speedCpWrite.finishInterval();
+        speedMarkAndAvgParkTime.finishInterval();
+        threadIds.clear();
+    }
+
+    /**
+     * @return Exponential backoff counter.
+     */
+    public long throttleParkTime() {
+        return speedMarkAndAvgParkTime.getAverage();
+    }
+
+    /**
+     * @return Target (maximum) dirty pages ratio, after which throttling will start.
+     */
+    public double getTargetDirtyRatio() {
+        return targetDirtyRatio;
+    }
+
+    /**
+     * @return Current dirty pages ratio.
+     */
+    public double getCurrDirtyRatio() {
+        double ratio = currDirtyRatio;
+
+        if (ratio >= 0)
+            return ratio;
+
+        return pageMemory.getDirtyPagesRatio();
+    }
+
+    /**
+     * @return  Speed of marking pages dirty. Value from past 750-1000 millis only. Pages/second.
+     */
+    public long getMarkDirtySpeed() {
+        return speedMarkAndAvgParkTime.getSpeedOpsPerSec(System.nanoTime());
+    }
+
+    /**
+     * @return Speed average checkpoint write speed. Current and 3 past checkpoints used. Pages/second.
+     */
+    public long getCpWriteSpeed() {
+        return speedCpWrite.getSpeedOpsPerSecReadOnly();
+    }
+
+    /**
+     * @return Returns {@link #speedForMarkAll}.
+     */
+    public long getLastEstimatedSpeedForMarkAll() {
+        return speedForMarkAll;
+    }
+
+    /**
+     * Measurement shows how much throttling time is involved into average marking time.
+     * @return metric started from 0.0 and showing how much throttling is involved into current marking process.
+     */
+    public double throttleWeight() {
+        long speed = speedMarkAndAvgParkTime.getSpeedOpsPerSec(System.nanoTime());
+
+        if (speed <= 0)
+            return 0;
+
+        long timeForOnePage = calcDelayTime(speed, threadIds.size(), 1);
+
+        if (timeForOnePage == 0)
+            return 0;
+
+        return 1.0 * throttleParkTime() / timeForOnePage;
+    }
+
+    /**
+     * Throttling mode for page.
+     */
+    private enum ThrottleMode {
+        /** No delay is applied. */
+        NO,
+
+        /** Limited, time is based on target speed. */
+        LIMITED,
+
+        /** Exponential. */
+        EXPONENTIAL
+    }
+}
index a890442..9206935 100644 (file)
@@ -24,7 +24,7 @@ import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabase
  * Throttles threads that generate dirty pages during ongoing checkpoint.
  * Designed to avoid zero dropdowns that can happen if checkpoint buffer is overflowed.
  */
-public class PagesWriteThrottle {
+public class PagesWriteThrottle implements PagesWriteThrottlePolicy {
     /** Page memory. */
     private final PageMemoryImpl pageMemory;
 
@@ -48,11 +48,8 @@ public class PagesWriteThrottle {
         this.dbSharedMgr = dbSharedMgr;
     }
 
-    /**
-     * Callback to apply throttling delay.
-     * @param isInCheckpoint flag indicating if checkpoint is running.
-     */
-    public void onMarkDirty(boolean isInCheckpoint) {
+    /** {@inheritDoc} */
+    @Override public void onMarkDirty(boolean isPageInCheckpoint) {
         assert dbSharedMgr.checkpointLockIsHeldByThread();
 
         AtomicInteger writtenPagesCntr = dbSharedMgr.writtenPagesCounter();
@@ -62,7 +59,7 @@ public class PagesWriteThrottle {
 
         boolean shouldThrottle = false;
 
-        if (isInCheckpoint) {
+        if (isPageInCheckpoint) {
             int checkpointBufLimit = pageMemory.checkpointBufferPagesSize() * 2 / 3;
 
             shouldThrottle = pageMemory.checkpointBufferPagesCount() > checkpointBufLimit;
@@ -96,10 +93,12 @@ public class PagesWriteThrottle {
             exponentialBackoffCntr.set(0);
     }
 
-    /**
-     *
-     */
-    public void onFinishCheckpoint() {
+    /** {@inheritDoc} */
+    @Override public void onBeginCheckpoint() {
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onFinishCheckpoint() {
         exponentialBackoffCntr.set(0);
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
new file mode 100644 (file)
index 0000000..adeaa3d
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.pagemem;
+
+/**
+ * Throttling policy, encapsulates logic of delaying write operations.
+ */
+public interface PagesWriteThrottlePolicy {
+    /**
+     * Callback to apply throttling delay.
+     * @param isPageInCheckpoint flag indicating if current page is in scope of current checkpoint.
+     */
+    void onMarkDirty(boolean isPageInCheckpoint);
+
+    /**
+     * Callback to notify throttling policy checkpoint was started.
+     */
+    void onBeginCheckpoint();
+
+    /**
+     * Callback to notify throttling policy checkpoint was finished.
+     */
+    void onFinishCheckpoint();
+}
index 782949f..c17f6cb 100644 (file)
@@ -120,7 +120,7 @@ public class IgniteCheckpointDirtyPagesForLowLoadTest extends GridCommonAbstract
 
             boolean checkpointWithLowNumOfPagesFound = false;
 
-            for (int i = 0; i < 10; i++) {
+            for (int i = 0; i < 20; i++) {
                 Random random = new Random();
                 //touch some entry
                 int d = random.nextInt(PARTS) + PARTS;
@@ -134,18 +134,29 @@ public class IgniteCheckpointDirtyPagesForLowLoadTest extends GridCommonAbstract
                 if (log.isInfoEnabled())
                     log.info("Put to cache [" + fullname + "] value " + d);
 
-                final int timeout = 5000;
+                long start = System.nanoTime();
                 try {
-                    db.wakeupForCheckpoint("").get(timeout, TimeUnit.MILLISECONDS);
+                    final int cpTimeout = 25000;
+
+                    db.wakeupForCheckpoint("").get(cpTimeout, TimeUnit.MILLISECONDS);
                 }
-                catch (IgniteFutureTimeoutCheckedException e) {
+                catch (IgniteFutureTimeoutCheckedException ignored) {
+                    long msPassed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+
+                    log.error("Timeout during waiting for checkpoint to start:" +
+                        " [" + msPassed + "] but checkpoint is not running");
+
                     continue;
                 }
 
+                final int timeout = 5000;
                 int currCpPages = waitForCurrentCheckpointPagesCounterUpdated(db, timeout);
 
-                if (currCpPages < 0)
+                if (currCpPages < 0) {
+                    log.error("Timeout during waiting for checkpoint counter to be updated");
+
                     continue;
+                }
 
                 pageCntObserved.add(currCpPages);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteMassLoadSandboxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteMassLoadSandboxTest.java
new file mode 100644 (file)
index 0000000..c49f08e
--- /dev/null
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.checkpoint;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import junit.framework.TestCase;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.BinaryConfiguration;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
+
+/**
+ * Sandbox test to measure progress of grid write operations. If no progress occur during period of time, then thread
+ * dumps are generated.
+ */
+public class IgniteMassLoadSandboxTest extends GridCommonAbstractTest {
+    /** Cache name. Random to cover external stores possible problems. */
+    public static final String CACHE_NAME = "partitioned" + new Random().nextInt(10000000);
+
+    /** Object size - minimal size of object to be placed in cache. */
+    private static final int OBJECT_SIZE = 40000;
+
+    /** Records count to continuous put into cache. */
+    private static final int CONTINUOUS_PUT_RECS_CNT = 300_000;
+
+    /** Put thread: client threads naming for put operation. */
+    private static final String PUT_THREAD = "put-thread";
+
+    /** Get thread: client threadsd naming for verify operation. */
+    private static final String GET_THREAD = "get-thread";
+
+    /** Option to enabled storage verification after test. */
+    private static final boolean VERIFY_STORAGE = false;
+
+    /**
+     * Set WAL archive and work folders to same value.  Activates 'No Archiver' mode.
+     * See {@link FileWriteAheadLogManager#isArchiverEnabled()}.
+     */
+    private boolean setWalArchAndWorkToSameVal;
+
+    /** Option for test run: WAL segments size in bytes. */
+    private int walSegmentSize = 64 * 1024 * 1024;
+
+    /** Option for test run: Custom WAL mode. */
+    private WALMode customWalMode;
+
+    /** Option for test run: Checkpoint frequency. */
+    private int checkpointFrequency = 40 * 1000;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration<Integer, HugeIndexedObject> ccfg = new CacheConfiguration<>();
+
+        ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+        ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, 1024));
+        ccfg.setIndexedTypes(Integer.class, HugeIndexedObject.class);
+        ccfg.setName(CACHE_NAME);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        DataRegionConfiguration regCfg = new DataRegionConfiguration()
+            .setName("dfltMemPlc")
+            .setMetricsEnabled(true)
+            .setMaxSize(2 * 1024L * 1024 * 1024)
+            .setPersistenceEnabled(true);
+
+        DataStorageConfiguration dsCfg = new DataStorageConfiguration();
+
+        dsCfg.setDefaultDataRegionConfiguration(regCfg)
+            .setPageSize(4 * 1024)
+            .setWriteThrottlingEnabled(true)
+            .setCheckpointFrequency(checkpointFrequency);
+
+        final String workDir = U.defaultWorkDirectory();
+        final File db = U.resolveWorkDirectory(workDir, DFLT_STORE_DIR, false);
+        final File wal = new File(db, "wal");
+        if (setWalArchAndWorkToSameVal) {
+            final String walAbsPath = wal.getAbsolutePath();
+
+            dsCfg.setWalPath(walAbsPath);
+
+            dsCfg.setWalArchivePath(walAbsPath);
+        }
+        else {
+            dsCfg.setWalPath(wal.getAbsolutePath());
+
+            dsCfg.setWalArchivePath(new File(wal, "archive").getAbsolutePath());
+        }
+
+        dsCfg.setWalMode(customWalMode != null ? customWalMode : WALMode.LOG_ONLY)
+            .setWalHistorySize(1)
+            .setWalSegments(10);
+
+        if (walSegmentSize != 0)
+            dsCfg.setWalSegmentSize(walSegmentSize);
+
+        cfg.setDataStorageConfiguration(dsCfg);
+
+        cfg.setBinaryConfiguration(new BinaryConfiguration().setCompactFooter(false));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "temp", false));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * Runs multithreaded put scenario (no data streamer). Load is generated to page store and to WAL.
+     * @throws Exception if failed.
+     */
+    public void testContinuousPutMultithreaded() throws Exception {
+        try {
+            // System.setProperty(IgniteSystemProperties.IGNITE_DIRTY_PAGES_PARALLEL, "true");
+            // System.setProperty(IgniteSystemProperties.IGNITE_DIRTY_PAGES_SORTED_STORAGE, "true");
+            System.setProperty(IgniteSystemProperties.IGNITE_USE_ASYNC_FILE_IO_FACTORY, "false");
+            System.setProperty(IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED, "speed");
+
+            setWalArchAndWorkToSameVal = true;
+
+            customWalMode = WALMode.BACKGROUND;
+
+            final IgniteEx ignite = startGrid(1);
+
+            ignite.active(true);
+
+            final IgniteCache<Object, HugeIndexedObject> cache = ignite.cache(CACHE_NAME);
+            final int threads = Runtime.getRuntime().availableProcessors();
+
+            final int recsPerThread = CONTINUOUS_PUT_RECS_CNT / threads;
+
+            final Collection<Callable<?>> tasks = new ArrayList<>();
+
+            final ProgressWatchdog watchdog = new ProgressWatchdog(ignite, "put", PUT_THREAD);
+
+            for (int j = 0; j < threads; j++) {
+                final int finalJ = j;
+
+                tasks.add(new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        for (int i = finalJ * recsPerThread; i < ((finalJ + 1) * recsPerThread); i++) {
+                            HugeIndexedObject v = new HugeIndexedObject(i);
+                            cache.put(i, v);
+                            watchdog.reportProgress(1);
+                        }
+                        return null;
+                    }
+                });
+            }
+
+            watchdog.start();
+            GridTestUtils.runMultiThreaded(tasks, PUT_THREAD);
+
+            watchdog.stopping();
+            stopGrid(1);
+
+            watchdog.stop();
+
+            if (VERIFY_STORAGE)
+                runVerification(threads, recsPerThread);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Runs multithreaded put scenario (no data streamer). Load is generated to page store and to WAL.
+     * @throws Exception if failed.
+     */
+    public void testDataStreamerContinuousPutMultithreaded() throws Exception {
+        try {
+            // System.setProperty(IgniteSystemProperties.IGNITE_DIRTY_PAGES_PARALLEL, "true");
+            // System.setProperty(IgniteSystemProperties.IGNITE_DIRTY_PAGES_SORTED_STORAGE, "true");
+            System.setProperty(IgniteSystemProperties.IGNITE_USE_ASYNC_FILE_IO_FACTORY, "false");
+            System.setProperty(IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED, "speed");
+
+            setWalArchAndWorkToSameVal = true;
+
+            customWalMode = WALMode.BACKGROUND;
+
+            final IgniteEx ignite = startGrid(1);
+
+            ignite.active(true);
+
+            final int threads = 1; Runtime.getRuntime().availableProcessors();
+
+            final int recsPerThread = CONTINUOUS_PUT_RECS_CNT / threads;
+
+            final ProgressWatchdog watchdog = new ProgressWatchdog(ignite, "put", PUT_THREAD);
+
+            IgniteDataStreamer<Object, Object> streamer = ignite.dataStreamer(CACHE_NAME);
+
+            streamer.perNodeBufferSize(12);
+
+            final Collection<Callable<?>> tasks = new ArrayList<>();
+            for (int j = 0; j < threads; j++) {
+                final int finalJ = j;
+
+                tasks.add((Callable<Void>)() -> {
+                    for (int i = finalJ * recsPerThread; i < ((finalJ + 1) * recsPerThread); i++)
+                        streamer.addData(i, new HugeIndexedObject(i));
+
+                    return null;
+                });
+            }
+
+            final IgniteCache<Object, HugeIndexedObject> cache = ignite.cache(CACHE_NAME);
+            ScheduledExecutorService svcReport = Executors.newScheduledThreadPool(1);
+
+            AtomicInteger size = new AtomicInteger();
+            svcReport.scheduleAtFixedRate(
+                () -> {
+                    int newSize = cache.size();
+                    int oldSize = size.getAndSet(newSize);
+
+                    watchdog.reportProgress(newSize - oldSize);
+                },
+                250, 250, TimeUnit.MILLISECONDS);
+
+            watchdog.start();
+            GridTestUtils.runMultiThreaded(tasks, PUT_THREAD);
+            streamer.close();
+
+            watchdog.stopping();
+            stopGrid(1);
+
+            watchdog.stop();
+
+            ProgressWatchdog.stopPool(svcReport);
+
+            if (VERIFY_STORAGE)
+                runVerification(threads, recsPerThread);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+
+    /**
+     * Verifies data from storage.
+     *
+     * @param threads threads count.
+     * @param recsPerThread record per thread loaded.
+     * @throws Exception if failed
+     */
+    private void runVerification(int threads, final int recsPerThread) throws Exception {
+        final Ignite restartedIgnite = startGrid(1);
+
+        restartedIgnite.active(true);
+
+        final IgniteCache<Integer, HugeIndexedObject> restartedCache = restartedIgnite.cache(CACHE_NAME);
+
+        final ProgressWatchdog watchdog2 = new ProgressWatchdog(restartedIgnite, "get", GET_THREAD);
+
+        final Collection<Callable<?>> tasksR = new ArrayList<>();
+        tasksR.clear();
+        for (int j = 0; j < threads; j++) {
+            final int finalJ = j;
+            tasksR.add(new Callable<Void>() {
+                @Override public Void call() {
+                    for (int i = finalJ * recsPerThread; i < ((finalJ + 1) * recsPerThread); i++) {
+                        HugeIndexedObject obj = restartedCache.get(i);
+                        int actVal = obj.iVal;
+                        TestCase.assertEquals(i, actVal);
+                        watchdog2.reportProgress(1);
+                    }
+                    return null;
+                }
+            });
+        }
+
+        watchdog2.start();
+        GridTestUtils.runMultiThreaded(tasksR, GET_THREAD);
+        watchdog2.stop();
+    }
+
+    /**
+     * @param threads Threads count.
+     * @param recsPerThread initial records per thread.
+     * @param restartedCache cache to obtain data from.
+     */
+    private void verifyByChunk(int threads, int recsPerThread, Cache<Integer, HugeIndexedObject> restartedCache) {
+        int verifyChunk = 100;
+
+        int totalRecsToVerify = recsPerThread * threads;
+        int chunks = totalRecsToVerify / verifyChunk;
+
+        for (int c = 0; c < chunks; c++) {
+            Set<Integer> keys = new TreeSet<>();
+
+            for (int i = 0; i < verifyChunk; i++)
+                keys.add(i + c * verifyChunk);
+
+            Map<Integer, HugeIndexedObject> values = restartedCache.getAll(keys);
+
+            for (Map.Entry<Integer, HugeIndexedObject> next : values.entrySet()) {
+                Integer key = next.getKey();
+
+                int actVal = values.get(next.getKey()).iVal;
+                int i = key;
+                TestCase.assertEquals(i, actVal);
+
+                if (i % 1000 == 0)
+                    X.println(" >> Verified: " + i);
+            }
+
+        }
+    }
+
+    /**
+     * @param id entry id.
+     * @return {@code True} if need to keep entry in DB and checkpoint it. Most of entries not required.
+     */
+    private static boolean keepInDb(int id) {
+        return id % 1777 == 0;
+    }
+
+    /**
+     * Runs multithreaded put-remove scenario (no data streamer). Load is generated to WAL log mostly.
+     * Most of entries generated will be removed before first checkpoint.
+     *
+     * @throws Exception if failed.
+     */
+    public void testPutRemoveMultithreaded() throws Exception {
+        setWalArchAndWorkToSameVal = false;
+        customWalMode = WALMode.LOG_ONLY;
+
+        try {
+            final IgniteEx ignite = startGrid(1);
+
+            ignite.active(true);
+            checkpointFrequency = 20 * 1000;
+            final IgniteCache<Object, HugeIndexedObject> cache = ignite.cache(CACHE_NAME);
+            int totalRecs = 400_000;
+            final int threads = 10;
+            final int recsPerThread = totalRecs / threads;
+            final Collection<Callable<?>> tasks = new ArrayList<>();
+            final ProgressWatchdog watchdog = new ProgressWatchdog(ignite, "put", PUT_THREAD);
+
+            for (int j = 0; j < threads; j++) {
+                final int finalJ = j;
+
+                tasks.add(new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        final Collection<Integer> toRmvLaterList = new ArrayList<>();
+
+                        for (int id = finalJ * recsPerThread; id < ((finalJ + 1) * recsPerThread); id++) {
+                            HugeIndexedObject v = new HugeIndexedObject(id);
+
+                            cache.put(id, v);
+                            toRmvLaterList.add(id);
+                            watchdog.reportProgress(1);
+
+                            if (toRmvLaterList.size() > 100) {
+                                for (Integer toRemoveId : toRmvLaterList) {
+                                    if (keepInDb(toRemoveId))
+                                        continue;
+
+                                    boolean rmv = cache.remove(toRemoveId);
+                                    assert rmv : "Expected to remove object from cache " + toRemoveId;
+                                }
+                                toRmvLaterList.clear();
+                            }
+                        }
+                        return null;
+                    }
+                });
+            }
+
+            watchdog.start();
+            GridTestUtils.runMultiThreaded(tasks, PUT_THREAD);
+            watchdog.stop();
+            stopGrid(1);
+
+            final Ignite restartedIgnite = startGrid(1);
+
+            restartedIgnite.active(true);
+
+            final IgniteCache<Object, HugeIndexedObject> restartedCache = restartedIgnite.cache(CACHE_NAME);
+
+            for (int i = 0; i < recsPerThread * threads; i++) {
+                if (keepInDb(i)) {
+                    final HugeIndexedObject obj = restartedCache.get(i);
+
+                    TestCase.assertNotNull(obj);
+                    TestCase.assertEquals(i, obj.iVal);
+                }
+
+                if (i % 1000 == 0)
+                    X.print(" V: " + i);
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return TimeUnit.MINUTES.toMillis(20);
+    }
+
+    /** Object with additional 40 000 bytes of payload */
+    public static class HugeIndexedObject   {
+        /** Data. */
+        private byte[] data;
+        /** */
+        @QuerySqlField(index = true)
+        private int iVal;
+
+        /**
+         * @param iVal Integer value.
+         */
+        private HugeIndexedObject(int iVal) {
+            this.iVal = iVal;
+
+            int sz = OBJECT_SIZE;
+
+            data = new byte[sz];
+            for (int i = 0; i < sz; i++)
+                data[i] = (byte)('A' + (i % 10));
+        }
+
+        /**
+         * @return Data.
+         */
+        public byte[] data() {
+            return data;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (!(o instanceof HugeIndexedObject))
+                return false;
+
+            HugeIndexedObject that = (HugeIndexedObject)o;
+
+            return iVal == that.iVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return iVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(HugeIndexedObject.class, this);
+        }
+    }
+}
\ No newline at end of file
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/ProgressWatchdog.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/ProgressWatchdog.java
new file mode 100644 (file)
index 0000000..55ca2a9
--- /dev/null
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.persistence.db.checkpoint;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.ignite.DataRegionMetrics;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl;
+import org.apache.ignite.internal.processors.cache.persistence.pagemem.PagesWriteSpeedBasedThrottle;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Process watchdog for ignite instance. Detects gaps in progress, prints and saves summary of execution metrics to
+ * file. Client implementation should use {@link ProgressWatchdog#reportProgress(int)} to show how much data was
+ * processed at client size. Long absence of this calls will cause thread dumps to be generated.
+ */
+class ProgressWatchdog {
+    public static final int CHECK_PERIOD_MSEC = 1000;
+    /** Progress counter, Overall records processed. */
+    private final LongAdder overallRecordsProcessed = new LongAdder();
+
+    /** Metrics log Txt file writer. */
+    private final FileWriter txtWriter;
+
+    /** Client threads name, included into thread dumps. */
+    private String clientThreadsName;
+
+    /** Service for scheduled execution of watchdog. */
+    private ScheduledExecutorService svc = Executors.newScheduledThreadPool(1);
+
+    /** Operation name for messages and log. */
+    private final String operation;
+
+    /** Ignite instance. */
+    private Ignite ignite;
+
+    /** Stopping flag, indicates ignite close was called but stop is not finished. */
+    private volatile boolean stopping;
+
+    /** Value of records count at previous tick, see {@link #overallRecordsProcessed} */
+    private final AtomicLong prevRecordsCnt = new AtomicLong();
+
+    /** Milliseconds elapsed at previous tick. */
+    private final AtomicLong prevMsElapsed = new AtomicLong();
+
+    /** Checkpoint written pages at previous tick. */
+    private final AtomicLong prevCpWrittenPages = new AtomicLong();
+
+    /** Checkpoint fsync()'ed pages at previous tick. */
+    private final AtomicLong prevCpSyncedPages = new AtomicLong();
+
+    /** WAL pointer at previous tick reference. */
+    private final AtomicReference<FileWALPointer> prevWalPtrRef = new AtomicReference<>();
+
+    /** Milliseconds at start of watchdog execution. */
+    private long msStart;
+
+    /**
+     * Creates watchdog.
+     *
+     * @param ignite Ignite.
+     * @param operation Operation name for log.
+     * @param clientThreadsName Client threads name.
+     */
+    ProgressWatchdog(Ignite ignite, String operation,
+        String clientThreadsName) throws IgniteCheckedException, IOException {
+        this.ignite = ignite;
+        this.operation = operation;
+        txtWriter = new FileWriter(new File(getTempDirFile(), "watchdog-" + operation + ".txt"));
+        this.clientThreadsName = clientThreadsName;
+        line("sec",
+            "cur." + operation + "/sec",
+            "WAL speed, MB/s.",
+            "cp. speed, MB/sec",
+            "cp. sync., MB/sec",
+            "WAL work seg.",
+            "Throttle part time",
+            "curDirtyRatio",
+            "targetDirtyRatio",
+            "throttleWeigth",
+            "markDirtySpeed",
+            "cpWriteSpeed",
+            "estMarkAllSpeed",
+            "avg." + operation + "/sec",
+            "dirtyPages",
+            "cpWrittenPages",
+            "cpSyncedPages",
+            "cpEvictedPages",
+            "WAL idx",
+            "Arch. idx",
+            "WAL Archive seg.");
+    }
+
+    /**
+     * @return temp dir to place watchdog report.
+     * @throws IgniteCheckedException if failed.
+     */
+    @NotNull private static File getTempDirFile() throws IgniteCheckedException {
+        File tempDir = new File(U.defaultWorkDirectory(), "temp");
+
+        if (!tempDir.exists())
+            tempDir.mkdirs();
+
+        return tempDir;
+    }
+
+    /**
+     * Generates limited thread dump with only threads involved into persistence.
+     *
+     * @return string to log.
+     */
+    private String generateThreadDump() {
+        final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+        int depth = 100;
+        final ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), depth);
+        final StringBuilder dump = new StringBuilder();
+
+        for (ThreadInfo threadInfo : threadInfos) {
+            String name = threadInfo.getThreadName();
+
+            if (name.contains("checkpoint-runner")
+                || name.contains("db-checkpoint-thread")
+                || name.contains("wal-file-archiver")
+                || name.contains("data-streamer")
+                || (clientThreadsName!=null && name.contains(clientThreadsName))) {
+                String str = threadInfo.toString();
+
+                if (name.contains("db-checkpoint-thread")) {
+                    dump.append(str);
+                    dump.append("(Full stacktrace)");
+
+                    StackTraceElement[] stackTrace = threadInfo.getStackTrace();
+                    int i = 0;
+
+                    for (; i < stackTrace.length && i < depth; i++) {
+                        StackTraceElement ste = stackTrace[i];
+                        dump.append("\tat ").append(ste.toString());
+                        dump.append('\n');
+                    }
+
+                    if (i < stackTrace.length) {
+                        dump.append("\t...");
+                        dump.append('\n');
+                    }
+
+                    dump.append('\n');
+                }
+                else
+                    dump.append(str);
+            }
+            else {
+                String s = threadInfo.toString();
+
+                if (s.contains(FileWriteAheadLogManager.class.getSimpleName())
+                    || s.contains(FilePageStoreManager.class.getSimpleName()))
+                    dump.append(s);
+            }
+        }
+        return dump.toString();
+    }
+
+    /**
+     * Adds line to txt log {@link #txtWriter}.
+     * @param parms values to log
+     */
+    private void line(Object... parms) {
+        try {
+            for (int i = 0; i < parms.length; i++) {
+                Object parm = parms[i];
+                String delim = (i < parms.length - 1) ? "\t" : "\n";
+
+                txtWriter.write(parm + delim);
+            }
+
+            txtWriter.flush();
+        }
+        catch (IOException ignored) {
+        }
+    }
+
+    /**
+     * Starts watchdog execution.
+     */
+    public void start() {
+        msStart = U.currentTimeMillis();
+        prevMsElapsed.set(0);
+        prevRecordsCnt.set(0);
+        prevCpWrittenPages.set(0);
+        prevCpSyncedPages.set(0);
+        prevWalPtrRef.set(null);
+
+        svc.scheduleAtFixedRate(
+            this::tick,
+            CHECK_PERIOD_MSEC, CHECK_PERIOD_MSEC, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Regular method printing statistics to out and to log. Checks gaps in progress.
+     */
+    private void tick() {
+        long elapsedMs = U.currentTimeMillis() - msStart;
+        final long totalCnt = overallRecordsProcessed.longValue();
+        long elapsedMsFromPrevTick = elapsedMs - prevMsElapsed.getAndSet(elapsedMs);
+        if (elapsedMsFromPrevTick == 0)
+            return;
+
+        final long currPutPerSec = ((totalCnt - prevRecordsCnt.getAndSet(totalCnt)) * 1000) / elapsedMsFromPrevTick;
+        final long averagePutPerSec = totalCnt * 1000 / elapsedMs;
+        boolean slowProgress = currPutPerSec < averagePutPerSec / 10 && !stopping;
+        final String fileNameWithDump = slowProgress ? reactNoProgress() : "";
+
+        DataStorageConfiguration dsCfg = ignite.configuration().getDataStorageConfiguration();
+
+        String defRegName = dsCfg.getDefaultDataRegionConfiguration().getName();
+        long dirtyPages = -1;
+        for (DataRegionMetrics m : ignite.dataRegionMetrics())
+            if (m.getName().equals(defRegName))
+                dirtyPages = m.getDirtyPages();
+
+        GridCacheSharedContext<Object, Object> cacheSctx = null;
+        PageMemoryImpl pageMemory = null;
+        try {
+            cacheSctx = ((IgniteEx)ignite).context().cache().context();
+            pageMemory = (PageMemoryImpl)cacheSctx.database().dataRegion(defRegName).pageMemory();
+        }
+        catch (IgniteCheckedException e) {
+            e.printStackTrace();
+        }
+
+        long cpBufPages = 0;
+
+        GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)(cacheSctx.database());
+        AtomicInteger wrPageCntr = db.writtenPagesCounter();
+        long cpWrittenPages = wrPageCntr == null ? 0 : wrPageCntr.get();
+
+        AtomicInteger syncedPagesCntr = db.syncedPagesCounter();
+        int cpSyncedPages = syncedPagesCntr == null ? 0 : syncedPagesCntr.get();
+
+        AtomicInteger evictedPagesCntr = db.evictedPagesCntr();
+        int cpEvictedPages = evictedPagesCntr == null ? 0 : evictedPagesCntr.get();
+
+        int pageSize = pageMemory == null ? 0 : pageMemory.pageSize();
+
+        String cpWriteSpeed = getMBytesPrintable(
+            detectDelta(elapsedMsFromPrevTick, cpWrittenPages, prevCpWrittenPages) * pageSize);
+
+        String cpSyncSpeed = getMBytesPrintable(
+            detectDelta(elapsedMsFromPrevTick, cpSyncedPages, prevCpSyncedPages) * pageSize);
+
+        String walSpeed = "";
+        long throttleParkTimeNanos = 0;
+        double curDirtyRatio = 0.0;
+        String targetDirtyRatioStr = "";
+        double closeToThrottle = 0.0;
+        long idx = -1;
+        long lastArchIdx = -1;
+        int walArchiveSegments = 0;
+        long walWorkSegments = 0;
+        long markDirtySpeed = 0;
+        long cpWriteSpeedInPages = 0;
+        long estWrAllSpeed = 0;
+
+        try {
+            if (pageMemory != null) {
+                cpBufPages = pageMemory.checkpointBufferPagesCount();
+
+                PagesWriteSpeedBasedThrottle throttle = U.field(pageMemory, "writeThrottle");
+
+                if (throttle != null) {
+                    curDirtyRatio = throttle.getCurrDirtyRatio();
+
+                    double targetDirtyRatio = throttle.getTargetDirtyRatio();
+
+                    targetDirtyRatioStr = targetDirtyRatio < 0 ? "" : formatDbl(targetDirtyRatio);
+
+                    closeToThrottle = throttle.throttleWeight();
+                    throttleParkTimeNanos = throttle.throttleParkTime();
+                    markDirtySpeed = throttle.getMarkDirtySpeed();
+                    cpWriteSpeedInPages = throttle.getCpWriteSpeed();
+                    estWrAllSpeed = throttle.getLastEstimatedSpeedForMarkAll();
+                    if (estWrAllSpeed > 99_999)
+                        estWrAllSpeed = 99_999;
+                }
+            }
+
+            FileWriteAheadLogManager wal = (FileWriteAheadLogManager)cacheSctx.wal();
+
+            idx = 0;
+            lastArchIdx = 0;
+
+            walArchiveSegments = wal.walArchiveSegments();
+            walWorkSegments = idx - lastArchIdx;
+
+            /* // uncomment when currentWritePointer is available
+             FileWALPointer ptr = wal.currentWritePointer();
+               FileWALPointer prevWalPtr = this.prevWalPtrRef.getAndSet(ptr);
+
+               if (prevWalPtr != null) {
+                   long idxDiff = ptr.index() - prevWalPtr.index();
+                   long offDiff = ptr.fileOffset() - prevWalPtr.fileOffset();
+                   long bytesDiff = idxDiff * maxWalSegmentSize + offDiff;
+
+                   long bytesPerSec = (bytesDiff * 1000) / elapsedMsFromPrevTick;
+
+                   walSpeed = getMBytesPrintable(bytesPerSec);
+               } else
+                   walSpeed = "0";
+             */
+
+            walSpeed = "0";
+        }
+        catch (Exception e) {
+            X.error(e.getClass().getSimpleName() + ":" + e.getMessage());
+        }
+
+        long elapsedSecs = elapsedMs / 1000;
+        X.println(" >> " +
+            operation +
+            " done: " + totalCnt + "/" + elapsedSecs + "s, " +
+            "Cur. " + operation + " " + currPutPerSec + " recs/sec " +
+            "cpWriteSpeed=" + cpWriteSpeed + " " +
+            "cpSyncSpeed=" + cpSyncSpeed + " " +
+            "walSpeed= " + walSpeed + " " +
+            "walWorkSeg.="+walWorkSegments + " " +
+            "markDirtySpeed=" + markDirtySpeed +" " +
+            "Avg. " + operation + " " + averagePutPerSec + " recs/sec, " +
+            "dirtyP=" + dirtyPages + ", " +
+            "cpWrittenP.=" + cpWrittenPages + ", " +
+            "cpBufP.=" + cpBufPages + " " +
+            "threshold=" + targetDirtyRatioStr + " " +
+            "walIdx=" + idx + " " +
+            "archWalIdx=" + lastArchIdx + " " +
+            "walArchiveSegments=" + walArchiveSegments + " " +
+            fileNameWithDump);
+
+        line(elapsedSecs,
+            currPutPerSec,
+            walSpeed,
+            cpWriteSpeed,
+            cpSyncSpeed,
+            walWorkSegments,
+            throttleParkTimeNanos,
+            formatDbl(curDirtyRatio),
+            targetDirtyRatioStr,
+            formatDbl(closeToThrottle),
+            markDirtySpeed,
+            cpWriteSpeedInPages,
+            estWrAllSpeed,
+            averagePutPerSec,
+            dirtyPages,
+            cpWrittenPages,
+            cpSyncedPages,
+            cpEvictedPages,
+            idx,
+            lastArchIdx,
+            walArchiveSegments
+        );
+    }
+
+    /**
+     * @param val value to log.
+     * @return formatted value for txt log.
+     */
+    private String formatDbl(double val) {
+        return String.format("%.2f", val).replace(",", ".");
+    }
+
+    /**
+     * Converts bytes counter to MegaBytes
+     * @param currBytesWritten bytes.
+     * @return string with megabytes as printable string.
+     */
+    private String getMBytesPrintable(long currBytesWritten) {
+        double cpMbPs = 1.0 * currBytesWritten / (1024 * 1024);
+
+        return formatDbl(cpMbPs);
+    }
+
+    /**
+     * @param elapsedMsFromPrevTick time from previous tick, millis.
+     * @param absVal current value
+     * @param cnt counter stores previous value.
+     * @return value change from previous tick.
+     */
+    private long detectDelta(long elapsedMsFromPrevTick, long absVal, AtomicLong cnt) {
+        long cpPagesChange = absVal - cnt.getAndSet(absVal);
+
+        if (cpPagesChange < 0)
+            cpPagesChange = 0;
+
+        return (cpPagesChange * 1000) / elapsedMsFromPrevTick;
+    }
+
+    /**
+     * @return file name with dump created.
+     */
+    private String reactNoProgress() {
+        try {
+            String threadDump = generateThreadDump();
+
+            long sec = TimeUnit.MILLISECONDS.toSeconds(U.currentTimeMillis() - msStart);
+
+            String fileName = "dumpAt" + sec + "second.txt";
+
+            if (threadDump.contains(IgniteCacheDatabaseSharedManager.class.getName() + ".checkpointLock"))
+                fileName = "checkpoint_" + fileName;
+
+            fileName = operation + fileName;
+
+            try (FileWriter writer = new FileWriter(new File(getTempDirFile(), fileName))) {
+                writer.write(threadDump);
+            }
+
+            return fileName;
+        }
+        catch (IOException | IgniteCheckedException e) {
+            e.printStackTrace();
+        }
+        return "";
+    }
+
+    /**
+     * @param cnt counter of entries/operations processed since last call.
+     */
+    public void reportProgress(int cnt) {
+        overallRecordsProcessed.add(cnt);
+    }
+
+    /**
+     * Call this method to indicate client operation is done, and ignite is stopping.
+     */
+    public void stopping() {
+        stopping = true;
+    }
+
+    /**
+     * Stops watchdog threads.
+     */
+    public void stop() {
+        U.closeQuiet(txtWriter);
+
+        ScheduledExecutorService pool = this.svc;
+        stopPool(pool);
+    }
+
+    public static void stopPool(ExecutorService pool) {
+        pool.shutdown();
+        try {
+            pool.awaitTermination(10, TimeUnit.SECONDS);
+        }
+        catch (InterruptedException e) {
+            e.printStackTrace();
+            Thread.currentThread().interrupt();
+        }
+    }
+
+}
index 13e811c..1b6fde3 100644 (file)
@@ -84,7 +84,7 @@ public class BPlusTreePageMemoryImplTest extends BPlusTreeSelfTest {
                 }
             },
             new DataRegionMetricsImpl(new DataRegionConfiguration()),
-            false
+            PageMemoryImpl.ThrottlingPolicy.NONE
         );
 
         mem.start();
index ce5deba..1180164 100644 (file)
@@ -84,7 +84,7 @@ public class BPlusTreeReuseListPageMemoryImplTest extends BPlusTreeReuseSelfTest
                 }
             },
             new DataRegionMetricsImpl(new DataRegionConfiguration()),
-            false
+            PageMemoryImpl.ThrottlingPolicy.NONE
         );
 
         mem.start();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgniteThrottlingUnitTest.java
new file mode 100644 (file)
index 0000000..054696c
--- /dev/null
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.persistence.pagemem;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.LockSupport;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.logger.NullLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ *
+ */
+public class IgniteThrottlingUnitTest {
+    /** Logger. */
+    private IgniteLogger log = new NullLogger();
+
+    /** Page memory 2 g. */
+    private PageMemoryImpl pageMemory2g = mock(PageMemoryImpl.class);
+
+    {
+        when(pageMemory2g.totalPages()).thenReturn((2L * 1024 * 1024 * 1024) / 4096);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void breakInCaseTooFast() {
+        PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, log);
+
+        long time = throttle.getParkTime(0.67,
+            (362584 + 67064) / 2,
+            328787,
+            1,
+            60184,
+            23103);
+
+        assertTrue(time > 0);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void noBreakIfNotFastWrite() {
+        PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, log);
+
+        long time = throttle.getParkTime(0.47,
+            ((362584 + 67064) / 2),
+            328787,
+            1,
+            20103,
+            23103);
+
+        assertTrue(time == 0);
+    }
+
+    /**
+     * @throws InterruptedException if interrupted.
+     */
+    @Test
+    public void averageCalculation() throws InterruptedException {
+        IntervalBasedMeasurement measurement = new IntervalBasedMeasurement(100, 1);
+
+        for (int i = 0; i < 1000; i++)
+            measurement.addMeasurementForAverageCalculation(100);
+
+        assertEquals(100, measurement.getAverage());
+
+        Thread.sleep(220);
+
+        assertEquals(0, measurement.getAverage());
+
+        assertEquals(0, measurement.getSpeedOpsPerSec(System.nanoTime()));
+    }
+
+    /**
+     * @throws InterruptedException if interrupted.
+     */
+    @Test
+    public void speedCalculation() throws InterruptedException {
+        IntervalBasedMeasurement measurement = new IntervalBasedMeasurement(100, 1);
+
+        for (int i = 0; i < 1000; i++)
+            measurement.setCounter(i, System.nanoTime());
+
+        long speed = measurement.getSpeedOpsPerSec(System.nanoTime());
+        System.out.println("speed measured " + speed);
+        assertTrue(speed > 1000);
+
+        Thread.sleep(230);
+
+        assertEquals(0, measurement.getSpeedOpsPerSec(System.nanoTime()));
+    }
+
+    /**
+     * @throws InterruptedException if interrupted.
+     */
+    @Test
+    public void speedWithDelayCalculation() throws InterruptedException {
+        IntervalBasedMeasurement measurement = new IntervalBasedMeasurement(100, 1);
+
+        int runs = 10;
+        int nanosPark = 100;
+        int multiplier = 100000;
+        for (int i = 0; i < runs; i++) {
+            measurement.setCounter(i * multiplier, System.nanoTime());
+
+            LockSupport.parkNanos(nanosPark);
+        }
+
+        long speed = measurement.getSpeedOpsPerSec(System.nanoTime());
+
+        assertTrue(speed > 0);
+        long maxSpeed = (TimeUnit.SECONDS.toNanos(1) * multiplier * runs) / ((long)(runs * nanosPark));
+        assertTrue(speed < maxSpeed);
+
+        Thread.sleep(200);
+
+        assertEquals(0, measurement.getSpeedOpsPerSec(System.nanoTime()));
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void beginOfCp() {
+        PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, log);
+
+        assertTrue(throttle.getParkTime(0.01, 100,400000,
+            1,
+            20103,
+            23103) == 0);
+
+        //mark speed 22413 for mark all remaining as dirty
+        long time = throttle.getParkTime(0.024, 100, 400000,
+            1,
+            24000,
+            23103);
+        assertTrue(time > 0);
+
+        assertTrue(throttle.getParkTime(0.01,
+            100,
+            400000,
+            1,
+            22412,
+            23103) == 0);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void enforceThrottleAtTheEndOfCp() {
+        PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, log);
+
+        long time1 = throttle.getParkTime(0.70, 300000, 400000,
+            1, 20200, 23000);
+        long time2 = throttle.getParkTime(0.71, 300000, 400000,
+            1, 20200, 23000);
+
+        assertTrue(time2 >= time1 * 2); // extra slowdown should be applied.
+
+        long time3 = throttle.getParkTime(0.73, 300000, 400000,
+            1, 20200, 23000);
+        long time4 = throttle.getParkTime(0.74, 300000, 400000,
+            1, 20200, 23000);
+
+        assertTrue(time3 > time2);
+        assertTrue(time4 > time3);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void tooMuchPagesMarkedDirty() {
+        PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, null, log);
+
+       // 363308       350004  348976  10604
+        long time = throttle.getParkTime(0.75,
+            ((350004 + 348976) / 2),
+            350004-10604,
+            4,
+            279,
+            23933);
+
+        System.err.println(time);
+
+        assertTrue(time == 0);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void warningInCaseTooMuchThrottling() {
+        AtomicInteger warnings = new AtomicInteger(0);
+        IgniteLogger log = mock(IgniteLogger.class);
+
+        doAnswer(invocation -> {
+            Object[] args = invocation.getArguments();
+
+            System.out.println("log.info() called with arguments: " + Arrays.toString(args));
+
+            warnings.incrementAndGet();
+
+            return null;
+        }).when(log).info(anyString());
+
+        AtomicInteger written = new AtomicInteger();
+        GridCacheDatabaseSharedManager db = mock(GridCacheDatabaseSharedManager.class);
+        when(db.checkpointLockIsHeldByThread()).thenReturn(true);
+        when(db.writtenPagesCounter()).thenReturn(written);
+
+        PagesWriteSpeedBasedThrottle throttle = new PagesWriteSpeedBasedThrottle(pageMemory2g, db, log) {
+            @Override protected void doPark(long throttleParkTimeNs) {
+                //do nothing
+            }
+        };
+        throttle.onBeginCheckpoint();
+        written.set(200); //emulating some pages written
+
+        for (int i = 0; i < 100000; i++) {
+            //emulating high load on marking
+            throttle.onMarkDirty(false);
+
+            if (throttle.throttleWeight() > PagesWriteSpeedBasedThrottle.WARN_THRESHOLD)
+                break;
+        }
+
+        for (int i = 0; i < 1000; i++) {
+            //emulating additional page writes to be sure log message is generated
+
+            throttle.onMarkDirty(false);
+
+            if(warnings.get()>0)
+                break;
+        }
+
+        System.out.println(throttle.throttleWeight());
+
+        assertTrue(warnings.get() > 0);
+    }
+}
index effbe7e..04f3bd0 100644 (file)
@@ -99,7 +99,7 @@ public class IndexStoragePageMemoryImplTest extends IndexStorageSelfTest {
                 }
             },
             new DataRegionMetricsImpl(new DataRegionConfiguration()),
-            false
+            PageMemoryImpl.ThrottlingPolicy.NONE
         );
     }
 }
index 3d806b1..ed285c5 100644 (file)
@@ -90,7 +90,7 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest {
                 }
             },
             new DataRegionMetricsImpl(new DataRegionConfiguration()),
-            false
+            PageMemoryImpl.ThrottlingPolicy.NONE
         );
     }
 
index 5c91c59..1369f28 100644 (file)
@@ -122,7 +122,7 @@ public class PageMemoryImplTest extends GridCommonAbstractTest {
                 }
             },
             new DataRegionMetricsImpl(igniteCfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration()),
-            false
+            PageMemoryImpl.ThrottlingPolicy.NONE
         );
 
         mem.start();
index a7c549d..f957aec 100644 (file)
@@ -19,7 +19,6 @@ package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
 import org.apache.ignite.internal.processors.cache.IgniteClusterActivateDeactivateTestWithPersistence;
-import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsClientNearCachePutGetTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsDynamicCacheTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsSingleNodePutGetPersistenceTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsCacheRestoreTest;
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsUnitTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsUnitTestSuite.java
new file mode 100644 (file)
index 0000000..a2bfeb3
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.testsuites;
+
+import org.apache.ignite.internal.processors.cache.persistence.pagemem.IgniteThrottlingUnitTest;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ *
+ */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+    IgniteThrottlingUnitTest.class
+})
+public class IgnitePdsUnitTestSuite {
+}
index 22bd610..0546459 100644 (file)
@@ -30,12 +30,16 @@ import junit.framework.TestSuite;
  * This suite is not included into main build
  */
 public class IgniteReproducingSuite extends TestSuite {
+    /**
+     * @return suite with test(s) for reproduction some problem.
+     * @throws Exception if failed.
+     */
     public static TestSuite suite() throws Exception {
         TestSuite suite = new TestSuite("Ignite Issue Reproducing Test Suite");
 
         //uncomment to add some test
         //for (int i = 0; i < 100; i++)
-        //    suite.addTestSuite();
+        //    suite.addTestSuite(IgniteCheckpointDirtyPagesForLowLoadTest.class);
 
         return suite;
     }