IGNITE-7017 Added an option to write WAL directly to the archive directory - Fixes...
authordpavlov <dpavlov@gridgain.com>
Mon, 29 Jan 2018 14:44:11 +0000 (17:44 +0300)
committerAlexey Goncharuk <alexey.goncharuk@gmail.com>
Mon, 29 Jan 2018 14:44:11 +0000 (17:44 +0300)
Signed-off-by: Alexey Goncharuk <alexey.goncharuk@gmail.com>
modules/core/src/main/java/org/apache/ignite/configuration/DataRegionConfiguration.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentArchivedMonitor.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentReservationStorage.java [new file with mode: 0644]
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java

index 2950e9c..52cb764 100644 (file)
@@ -57,7 +57,7 @@ import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_DATA
  *                      <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
  *                          <property name="name" value="25MB_Region_Swapping"/>
  *                          <property name="initialSize" value="#{25 * 1024 * 1024}"/>
- *                          <property name="initialSize" value="#{100 * 1024 * 1024}"/>
+ *                          <property name="maxSize" value="#{100 * 1024 * 1024}"/>
  *                          <property name="swapPath" value="db/swap"/>
  *                      </bean>
  *                  </list>
index 73751c6..6b8be7e 100644 (file)
@@ -41,8 +41,6 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -271,15 +269,21 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     /** Factory to provide I/O interfaces for read/write operations with files */
     private FileIOFactory ioFactory;
 
-    /** Next segment archived monitor. */
-    private final Object nextSegmentArchivedMonitor = new Object();
+    /** Next WAL segment archived monitor. Manages last archived index, emulates archivation in no-archiver mode. */
+    private final SegmentArchivedMonitor archivedMonitor = new SegmentArchivedMonitor();
+
+    /** Segment reservations storage: Protects WAL segments from deletion during WAL log cleanup. */
+    private final SegmentReservationStorage reservationStorage = new SegmentReservationStorage();
 
     /** Updater for {@link #currHnd}, used for verify there are no concurrent update for current log segment handle */
     private static final AtomicReferenceFieldUpdater<FileWriteAheadLogManager, FileWriteHandle> CURR_HND_UPD =
         AtomicReferenceFieldUpdater.newUpdater(FileWriteAheadLogManager.class, FileWriteHandle.class, "currHnd");
 
-    /** */
-    private volatile FileArchiver archiver;
+    /**
+     * File archiver moves segments from work directory to archive. Locked segments may be kept not moved until
+     * release. For mode archive and work folders set to equal value, archiver is not created.
+     */
+    @Nullable private volatile FileArchiver archiver;
 
     /** Compressor. */
     private volatile FileCompressor compressor;
@@ -386,7 +390,15 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
             lastTruncatedArchiveIdx = tup == null ? -1 : tup.get1() - 1;
 
-            archiver = new FileArchiver(tup == null ? -1 : tup.get2());
+            long lastAbsArchivedIdx = tup == null ? -1 : tup.get2();
+
+            if (isArchiverEnabled())
+                archiver = new FileArchiver(lastAbsArchivedIdx);
+            else
+                archiver = null;
+
+            if (lastAbsArchivedIdx > 0)
+                archivedMonitor.setLastArchivedAbsoluteIndex(lastAbsArchivedIdx);
 
             if (dsCfg.isWalCompactionEnabled()) {
                 compressor = new FileCompressor();
@@ -408,14 +420,24 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     }
 
     /**
+     * Archiver can be not created, all files will be written to WAL folder, using absolute segment index.
      *
+     * @return flag indicating if archiver is disabled.
      */
-    public Collection<File> getAndReserveWalFiles(FileWALPointer low, FileWALPointer high) throws IgniteCheckedException {
-        FileArchiver archiver0 = archiver;
+    private boolean isArchiverEnabled() {
+        if (walArchiveDir != null && walWorkDir != null)
+            return !walArchiveDir.equals(walWorkDir);
+
+        return !new File(dsCfg.getWalArchivePath()).equals(new File(dsCfg.getWalPath()));
+    }
 
+    /**
+     *
+     */
+    public Collection<File> getAndReserveWalFiles(FileWALPointer low, FileWALPointer high) throws IgniteCheckedException {
         final long awaitIdx = high.index() - 1;
 
-        awaitSegmentArchived(archiver0, awaitIdx);
+        archivedMonitor.awaitSegmentArchived(awaitIdx);
 
         if (!reserve(low))
             throw new IgniteCheckedException("WAL archive segment has been deleted [idx=" + low.index() + "]");
@@ -439,23 +461,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     }
 
     /**
-     * @param archiver0 Archiver.
-     * @param awaitIdx Method will wait archivation of that index.
-     */
-    private void awaitSegmentArchived(FileArchiver archiver0, long awaitIdx) throws IgniteInterruptedCheckedException {
-        synchronized (nextSegmentArchivedMonitor) {
-            while (archiver0.lastArchivedAbsoluteIndex() < awaitIdx) {
-                try {
-                    nextSegmentArchivedMonitor.wait(2000);
-                }
-                catch (InterruptedException e) {
-                    throw new IgniteInterruptedCheckedException(e);
-                }
-            }
-        }
-    }
-
-    /**
      * @throws IgniteCheckedException if WAL store path is configured and archive path isn't (or vice versa)
      */
     private void checkWalConfiguration() throws IgniteCheckedException {
@@ -517,8 +522,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         start0();
 
         if (!cctx.kernalContext().clientNode()) {
-            assert archiver != null;
-            archiver.start();
+            if (isArchiverEnabled()) {
+                assert archiver != null;
+
+                archiver.start();
+            }
 
             if (compressor != null)
                 compressor.start();
@@ -781,15 +789,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         if (mode == WALMode.NONE)
             return false;
 
-        FileArchiver archiver0 = archiver;
-
-        if (archiver0 == null)
-            throw new IgniteCheckedException("Could not reserve WAL segment: archiver == null");
-
-        archiver0.reserve(((FileWALPointer)start).index());
+        reservationStorage.reserve(((FileWALPointer)start).index());
 
         if (!hasIndex(((FileWALPointer)start).index())) {
-            archiver0.release(((FileWALPointer)start).index());
+            reservationStorage.release(((FileWALPointer)start).index());
 
             return false;
         }
@@ -804,12 +807,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         if (mode == WALMode.NONE)
             return;
 
-        FileArchiver archiver0 = archiver;
-
-        if (archiver0 == null)
-            throw new IgniteCheckedException("Could not release WAL segment: archiver == null");
-
-        archiver0.release(((FileWALPointer)start).index());
+        reservationStorage.release(((FileWALPointer)start).index());
     }
 
     /**
@@ -850,17 +848,17 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
         int deleted = 0;
 
-        FileArchiver archiver0 = archiver;
-
         for (FileDescriptor desc : descs) {
             if (lowPtr != null && desc.idx < lowPtr.index())
                 continue;
 
             // Do not delete reserved or locked segment and any segment after it.
-            if (archiver0 != null && archiver0.reserved(desc.idx))
+            if (segmentReservedOrLocked(desc.idx))
                 return deleted;
 
-            long lastArchived = archiver0 != null ? archiver0.lastArchivedAbsoluteIndex() : lastArchivedIndex();
+            long archivedAbsIdx = archivedMonitor.lastArchivedAbsoluteIndex();
+
+            long lastArchived = archivedAbsIdx >= 0 ? archivedAbsIdx : lastArchivedIndex();
 
             // We need to leave at least one archived segment to correctly determine the archive index.
             if (desc.idx < highPtr.index() && desc.idx < lastArchived) {
@@ -879,6 +877,21 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         return deleted;
     }
 
+    /**
+     * Check if WAL segment locked (protected from move to archive) or reserved (protected from deletion from WAL
+     * cleanup).
+     *
+     * @param absIdx Absolute WAL segment index for check reservation.
+     * @return {@code True} if index is locked.
+     */
+    private boolean segmentReservedOrLocked(long absIdx) {
+        FileArchiver archiver0 = archiver;
+
+        return ((archiver0 != null) && archiver0.locked(absIdx))
+            || (reservationStorage.reserved(absIdx));
+
+    }
+
     /** {@inheritDoc} */
     @Override public void allowCompressionUntil(WALPointer ptr) {
         if (compressor != null)
@@ -889,7 +902,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     @Override public int walArchiveSegments() {
         long lastTruncated = lastTruncatedArchiveIdx;
 
-        long lastArchived = archiver.lastArchivedAbsoluteIndex();
+        long lastArchived = archivedMonitor.lastArchivedAbsoluteIndex();
 
         if (lastArchived == -1)
             return 0;
@@ -903,9 +916,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     @Override public boolean reserved(WALPointer ptr) {
         FileWALPointer fPtr = (FileWALPointer)ptr;
 
-        FileArchiver archiver0 = archiver;
-
-        return archiver0 != null && archiver0.reserved(fPtr.index());
+        return segmentReservedOrLocked(fPtr.index());
     }
 
     /** {@inheritDoc} */
@@ -1050,7 +1061,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) throws IgniteCheckedException {
         long absIdx = lastReadPtr == null ? 0 : lastReadPtr.index();
 
-        long segNo = absIdx % dsCfg.getWalSegments();
+        @Nullable FileArchiver archiver0 = archiver;
+
+        long segNo = archiver0 == null ? absIdx : absIdx % dsCfg.getWalSegments();
 
         File curFile = new File(walWorkDir, FileDescriptor.fileName(segNo));
 
@@ -1110,7 +1123,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                     ser,
                     rbuf);
 
-                archiver.currentWalIndex(absIdx);
+                if (archiver0 != null)
+                    archiver0.currentWalIndex(absIdx);
+                else
+                    archivedMonitor.setLastArchivedAbsoluteIndex(absIdx - 1);
 
                 return hnd;
             }
@@ -1238,9 +1254,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
         File[] allFiles = walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER);
 
-        if (allFiles.length != 0 && allFiles.length > dsCfg.getWalSegments())
-            throw new IgniteCheckedException("Failed to initialize wal (work directory contains " +
-                "incorrect number of segments) [cur=" + allFiles.length + ", expected=" + dsCfg.getWalSegments() + ']');
+        if(isArchiverEnabled())
+            if (allFiles.length != 0 && allFiles.length > dsCfg.getWalSegments())
+                throw new IgniteCheckedException("Failed to initialize wal (work directory contains " +
+                    "incorrect number of segments) [cur=" + allFiles.length + ", expected=" + dsCfg.getWalSegments() + ']');
 
         // Allocate the first segment synchronously. All other segments will be allocated by archiver in background.
         if (allFiles.length == 0) {
@@ -1253,9 +1270,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     }
 
     /**
-     * Clears the file with zeros.
+     * Clears the file, fills with zeros for Default mode.
      *
      * @param file File to format.
+     * @throws IgniteCheckedException if formatting failed
      */
     private void formatFile(File file) throws IgniteCheckedException {
         if (log.isDebugEnabled())
@@ -1317,8 +1335,16 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
      * @throws IgniteCheckedException If failed.
      */
     private File pollNextFile(long curIdx) throws IgniteCheckedException {
+        FileArchiver archiver0 = archiver;
+
+        if (archiver0 == null) {
+            archivedMonitor.setLastArchivedAbsoluteIndex(curIdx);
+
+            return new File(walWorkDir, FileDescriptor.fileName(curIdx + 1));
+        }
+
         // Signal to archiver that we are done with the segment and it can be archived.
-        long absNextIdx = archiver.nextAbsoluteSegmentIndex(curIdx);
+        long absNextIdx = archiver0.nextAbsoluteSegmentIndex(curIdx);
 
         long segmentIdx = absNextIdx % dsCfg.getWalSegments();
 
@@ -1398,12 +1424,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         /** current thread stopping advice */
         private volatile boolean stopped;
 
-        /**
-         * Maps absolute segment index to reservation counter. If counter > 0 then we wouldn't delete all segments
-         * which >= reserved segment index.
-         */
-        private NavigableMap<Long, Integer> reserved = new TreeMap<>();
-
         /** Formatted index. */
         private int formatted;
 
@@ -1423,13 +1443,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         }
 
         /**
-         * @return Last archived segment absolute index.
-         */
-        private long lastArchivedAbsoluteIndex() {
-            return lastAbsArchivedIdx;
-        }
-
-        /**
          * @throws IgniteInterruptedCheckedException If failed to wait for thread shutdown.
          */
         private void shutdown() throws IgniteInterruptedCheckedException {
@@ -1454,39 +1467,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         }
 
         /**
-         * @param absIdx Index for reservation.
-         */
-        private synchronized void reserve(long absIdx) {
-            Integer cur = reserved.get(absIdx);
-
-            if (cur == null)
-                reserved.put(absIdx, 1);
-            else
-                reserved.put(absIdx, cur + 1);
-        }
-
-        /**
-         * Check if WAL segment locked or reserved
+         * Check if WAL segment locked (protected from move to archive)
          *
          * @param absIdx Index for check reservation.
-         * @return {@code True} if index is reserved.
+         * @return {@code True} if index is locked.
          */
-        private synchronized boolean reserved(long absIdx) {
-            return locked.containsKey(absIdx) || reserved.floorKey(absIdx) != null;
-        }
-
-        /**
-         * @param absIdx Reserved index.
-         */
-        private synchronized void release(long absIdx) {
-            Integer cur = reserved.get(absIdx);
-
-            assert cur != null && cur >= 1 : cur;
-
-            if (cur == 1)
-                reserved.remove(absIdx);
-            else
-                reserved.put(absIdx, cur - 1);
+        private synchronized boolean locked(long absIdx) {
+            return locked.containsKey(absIdx);
         }
 
         /** {@inheritDoc} */
@@ -1577,9 +1564,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             if (compressor != null)
                 compressor.onNextSegmentArchived();
 
-            synchronized (nextSegmentArchivedMonitor) {
-                nextSegmentArchivedMonitor.notifyAll();
-            }
+            archivedMonitor.setLastArchivedAbsoluteIndex(idx);
         }
 
         /**
@@ -1638,8 +1623,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                             " lastAbsArchivedIdx=" + lastAbsArchivedIdx);
 
                     return true;
-                }
 
+                }
                 Integer cur = locked.get(absIdx);
 
                 cur = cur == null ? 1 : cur + 1;
@@ -1824,7 +1809,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             long segmentToCompress = lastCompressedIdx + 1;
 
             synchronized (this) {
-                while (segmentToCompress > Math.min(lastAllowedToCompressIdx, archiver.lastArchivedAbsoluteIndex())) {
+                while (segmentToCompress > Math.min(lastAllowedToCompressIdx, archivedMonitor.lastArchivedAbsoluteIndex())) {
                     wait();
 
                     if (stopped)
@@ -1845,11 +1830,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         private void deleteObsoleteRawSegments() {
             FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER));
 
-            FileArchiver archiver0 = archiver;
-
             for (FileDescriptor desc : descs) {
                 // Do not delete reserved or locked segment and any segment after it.
-                if (archiver0 != null && archiver0.reserved(desc.idx))
+                if (segmentReservedOrLocked(desc.idx))
                     return;
 
                 if (desc.idx < lastCompressedIdx) {
@@ -2852,14 +2835,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         /** */
         private final File walArchiveDir;
 
-        /** */
-        private final FileArchiver archiver;
+        /** See {@link FileWriteAheadLogManager#archiver}. */
+        @Nullable private final FileArchiver archiver;
 
         /** */
         private final FileDecompressor decompressor;
 
         /** */
-        private final DataStorageConfiguration psCfg;
+        private final DataStorageConfiguration dsCfg;
 
         /** Optional start pointer. */
         @Nullable
@@ -2875,9 +2858,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
          * @param walArchiveDir WAL archive dir.
          * @param start Optional start pointer.
          * @param end Optional end pointer.
-         * @param psCfg Database configuration.
+         * @param dsCfg Database configuration.
          * @param serializerFactory Serializer factory.
-         * @param archiver Archiver.
+         * @param archiver File Archiver.
          * @param decompressor Decompressor.
          *@param log Logger  @throws IgniteCheckedException If failed to initialize WAL segment.
          */
@@ -2887,10 +2870,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             File walArchiveDir,
             @Nullable FileWALPointer start,
             @Nullable FileWALPointer end,
-            DataStorageConfiguration psCfg,
+            DataStorageConfiguration dsCfg,
             @NotNull RecordSerializerFactory serializerFactory,
             FileIOFactory ioFactory,
-            FileArchiver archiver,
+            @Nullable FileArchiver archiver,
             FileDecompressor decompressor,
             IgniteLogger log
         ) throws IgniteCheckedException {
@@ -2898,10 +2881,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 cctx,
                 serializerFactory,
                 ioFactory,
-                psCfg.getWalRecordIteratorBufferSize());
+                dsCfg.getWalRecordIteratorBufferSize());
             this.walWorkDir = walWorkDir;
             this.walArchiveDir = walArchiveDir;
-            this.psCfg = psCfg;
+            this.dsCfg = dsCfg;
             this.archiver = archiver;
             this.start = start;
             this.end = end;
@@ -3014,10 +2997,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
             boolean readArchive = canReadArchiveOrReserveWork(curWalSegmIdx);
 
-            if (readArchive)
-                fd = new FileDescriptor(new File(walArchiveDir, FileDescriptor.fileName(curWalSegmIdx)));
+            if (archiver == null || readArchive) {
+                fd = new FileDescriptor(new File(walArchiveDir,
+                    FileDescriptor.fileName(curWalSegmIdx)));
+            }
             else {
-                long workIdx = curWalSegmIdx % psCfg.getWalSegments();
+                long workIdx = curWalSegmIdx % dsCfg.getWalSegments();
 
                 fd = new FileDescriptor(
                     new File(walWorkDir, FileDescriptor.fileName(workIdx)),
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentArchivedMonitor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentArchivedMonitor.java
new file mode 100644 (file)
index 0000000..81ecd41
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+
+/**
+ * Next WAL segment archived monitor. Manages last archived index, allows to emulate archivation in no-archiver mode.
+ * Monitor which is notified each time WAL segment is archived.
+ */
+class SegmentArchivedMonitor {
+    /**
+     * Last archived file absolute index, 0-based. Write is quarded by {@code this}. Negative value indicates there are
+     * no segments archived.
+     */
+    private volatile long lastAbsArchivedIdx = -1;
+
+    /**
+     * @return Last archived segment absolute index.
+     */
+    long lastArchivedAbsoluteIndex() {
+        return lastAbsArchivedIdx;
+    }
+
+    /**
+     * @param lastAbsArchivedIdx new value of last archived segment index
+     */
+    synchronized void setLastArchivedAbsoluteIndex(long lastAbsArchivedIdx) {
+        this.lastAbsArchivedIdx = lastAbsArchivedIdx;
+
+        notifyAll();
+    }
+
+    /**
+     * Method will wait activation of particular WAL segment index.
+     *
+     * @param awaitIdx absolute index  {@link #lastArchivedAbsoluteIndex()} to become true.
+     * @throws IgniteInterruptedCheckedException if interrupted.
+     */
+    synchronized void awaitSegmentArchived(long awaitIdx) throws IgniteInterruptedCheckedException {
+        while (lastArchivedAbsoluteIndex() < awaitIdx) {
+            try {
+                wait(2000);
+            }
+            catch (InterruptedException e) {
+                throw new IgniteInterruptedCheckedException(e);
+            }
+        }
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentReservationStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentReservationStorage.java
new file mode 100644 (file)
index 0000000..17da96d
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * Segment reservations storage: Protects WAL segments from deletion during WAL log cleanup.
+ */
+class SegmentReservationStorage {
+    /**
+     * Maps absolute segment index to reservation counter. If counter > 0 then we wouldn't delete all segments
+     * which has index >= reserved segment index. Guarded by {@code this}.
+     */
+    private NavigableMap<Long, Integer> reserved = new TreeMap<>();
+
+    /**
+     * @param absIdx Index for reservation.
+     */
+    synchronized void reserve(long absIdx) {
+        reserved.merge(absIdx, 1, (a, b) -> a + b);
+    }
+
+    /**
+     * Checks if segment is currently reserved (protected from deletion during WAL cleanup).
+     * @param absIdx Index for check reservation.
+     * @return {@code True} if index is reserved.
+     */
+    synchronized boolean reserved(long absIdx) {
+        return reserved.floorKey(absIdx) != null;
+    }
+
+    /**
+     * @param absIdx Reserved index.
+     */
+    synchronized void release(long absIdx) {
+        Integer cur = reserved.get(absIdx);
+
+        assert cur != null && cur >= 1 : cur;
+
+        if (cur == 1)
+            reserved.remove(absIdx);
+        else
+            reserved.put(absIdx, cur - 1);
+    }
+}
index 66a8aa9..cbdcc95 100644 (file)
@@ -192,10 +192,10 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest {
 
                         FileWriteAheadLogManager wal = (FileWriteAheadLogManager)ig.context().cache().context().wal();
 
-                        Object archiver = GridTestUtils.getFieldValue(wal, "archiver");
+                        Object reservationStorage = GridTestUtils.getFieldValue(wal, "reservationStorage");
 
-                        synchronized (archiver) {
-                            Map reserved = GridTestUtils.getFieldValue(archiver, "reserved");
+                        synchronized (reservationStorage) {
+                            Map reserved = GridTestUtils.getFieldValue(reservationStorage, "reserved");
 
                             if (reserved.isEmpty())
                                 return false;
@@ -219,10 +219,10 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest {
 
                     FileWriteAheadLogManager wal = (FileWriteAheadLogManager)ig.context().cache().context().wal();
 
-                    Object archiver = GridTestUtils.getFieldValue(wal, "archiver");
+                    Object reservationStorage = GridTestUtils.getFieldValue(wal, "reservationStorage");
 
-                    synchronized (archiver) {
-                        Map reserved = GridTestUtils.getFieldValue(archiver, "reserved");
+                    synchronized (reservationStorage) {
+                        Map reserved = GridTestUtils.getFieldValue(reservationStorage, "reserved");
 
                         if (!reserved.isEmpty())
                             return false;
@@ -402,10 +402,10 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest {
 
                         FileWriteAheadLogManager wal = (FileWriteAheadLogManager)ig.context().cache().context().wal();
 
-                        Object archiver = GridTestUtils.getFieldValue(wal, "archiver");
+                        Object reservationStorage = GridTestUtils.getFieldValue(wal, "reservationStorage");
 
-                        synchronized (archiver) {
-                            Map reserved = GridTestUtils.getFieldValue(archiver, "reserved");
+                        synchronized (reservationStorage) {
+                            Map reserved = GridTestUtils.getFieldValue(reservationStorage, "reserved");
 
                             if (reserved.isEmpty())
                                 return false;
@@ -431,10 +431,10 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest {
 
                     FileWriteAheadLogManager wal = (FileWriteAheadLogManager)ig.context().cache().context().wal();
 
-                    Object archiver = GridTestUtils.getFieldValue(wal, "archiver");
+                    Object reservationStorage = GridTestUtils.getFieldValue(wal, "reservationStorage");
 
-                    synchronized (archiver) {
-                        Map reserved = GridTestUtils.getFieldValue(archiver, "reserved");
+                    synchronized (reservationStorage) {
+                        Map reserved = GridTestUtils.getFieldValue(reservationStorage, "reserved");
 
                         if (!reserved.isEmpty())
                             return false;
index 63c219b..2f4c8d0 100644 (file)
@@ -119,6 +119,9 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
     /** Clear properties in afterTest() method. */
     private boolean clearProperties;
 
+    /** Set WAL and Archive path to same value. */
+    private boolean setWalAndArchiveToSameValue;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         final IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -134,7 +137,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
 
         cfg.setIncludeEventTypes(EventType.EVT_WAL_SEGMENT_ARCHIVED);
 
-        DataStorageConfiguration memCfg = new DataStorageConfiguration()
+        DataStorageConfiguration dsCfg = new DataStorageConfiguration()
             .setDefaultDataRegionConfiguration(
                 new DataRegionConfiguration().setMaxSize(1024 * 1024 * 1024).setPersistenceEnabled(true))
             .setPageSize(PAGE_SIZE)
@@ -144,9 +147,24 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
             .setWalMode(customWalMode != null ? customWalMode : WALMode.BACKGROUND);
 
         if (archiveIncompleteSegmentAfterInactivityMs > 0)
-            memCfg.setWalAutoArchiveAfterInactivity(archiveIncompleteSegmentAfterInactivityMs);
+            dsCfg.setWalAutoArchiveAfterInactivity(archiveIncompleteSegmentAfterInactivityMs);
+
+        final String workDir = U.defaultWorkDirectory();
+        final File db = U.resolveWorkDirectory(workDir, DFLT_STORE_DIR, false);
+        final File wal = new File(db, "wal");
+
+        if(setWalAndArchiveToSameValue) {
+            final String walAbsPath = wal.getAbsolutePath();
+
+            dsCfg.setWalPath(walAbsPath);
+            dsCfg.setWalArchivePath(walAbsPath);
+        } else {
+            dsCfg.setWalPath(wal.getAbsolutePath());
+            dsCfg.setWalArchivePath(new File(wal, "archive").getAbsolutePath());
+        }
+
+        cfg.setDataStorageConfiguration(dsCfg);
 
-        cfg.setDataStorageConfiguration(memCfg);
         return cfg;
     }
 
@@ -176,6 +194,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
      * @throws Exception if failed.
      */
     public void testFillWalAndReadRecords() throws Exception {
+        setWalAndArchiveToSameValue = false;
         final int cacheObjectsToWrite = 10000;
 
         final Ignite ignite0 = startGrid("node0");
@@ -192,7 +211,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest {
         final String workDir = U.defaultWorkDirectory();
         final File db = U.resolveWorkDirectory(workDir, DFLT_STORE_DIR, false);
         final File wal = new File(db, "wal");
-        final File walArchive = new File(wal, "archive");
+        final File walArchive = setWalAndArchiveToSameValue ? wal : new File(wal, "archive");
 
         final MockWalIteratorFactory mockItFactory = new MockWalIteratorFactory(log, PAGE_SIZE, consistentId, subfolderName, WAL_SEGMENTS);
         final WALIterator it = mockItFactory.iterator(wal, walArchive);