AMQ-7143 - Temporary transaction file (PageFile) being opened and closed many times... 227/head
authorAlan Protasio <alanprot@gmail.com>
Tue, 29 Jan 2019 22:44:06 +0000 (14:44 -0800)
committerChristopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Thu, 31 Jan 2019 15:31:16 +0000 (10:31 -0500)
activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/Transaction.java
activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/TransactionTest.java [new file with mode: 0644]

index 5b898f2..15694d4 100644 (file)
@@ -30,6 +30,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -144,7 +145,7 @@ public class PageFile {
     // Persistent settings stored in the page file.
     private MetaData metaData;
 
-    private final ArrayList<File> tmpFilesForRemoval = new ArrayList<File>();
+    private final HashMap<File, RandomAccessFile> tmpFilesForRemoval = new HashMap<>();
 
     private boolean useLFRUEviction = false;
     private float LFUEvictionFactor = 0.2f;
@@ -197,12 +198,18 @@ public class PageFile {
             return page;
         }
 
-        public byte[] getDiskBound() throws IOException {
+        public byte[] getDiskBound(HashMap<File, RandomAccessFile> tmpFiles) throws IOException {
             if (diskBound == null && diskBoundLocation != -1) {
                 diskBound = new byte[length];
-                try(RandomAccessFile file = new RandomAccessFile(tmpFile, "r")) {
+                if (tmpFiles.containsKey(tmpFile) && tmpFiles.get(tmpFile).getChannel().isOpen()) {
+                    RandomAccessFile file = tmpFiles.get(tmpFile);
                     file.seek(diskBoundLocation);
                     file.read(diskBound);
+                } else {
+                    try (RandomAccessFile file = new RandomAccessFile(tmpFile, "r")) {
+                        file.seek(diskBoundLocation);
+                        file.read(diskBound);
+                    }
                 }
                 diskBoundLocation = -1;
             }
@@ -1144,12 +1151,12 @@ public class PageFile {
 
                 for (PageWrite w : batch) {
                     try {
-                        checksum.update(w.getDiskBound(), 0, pageSize);
+                        checksum.update(w.getDiskBound(tmpFilesForRemoval), 0, pageSize);
                     } catch (Throwable t) {
                         throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t);
                     }
                     recoveryFile.writeLong(w.page.getPageId());
-                    recoveryFile.write(w.getDiskBound(), 0, pageSize);
+                    recoveryFile.write(w.getDiskBound(tmpFilesForRemoval), 0, pageSize);
                 }
 
                 // Can we shrink the recovery buffer??
@@ -1176,7 +1183,7 @@ public class PageFile {
 
             for (PageWrite w : batch) {
                 writeFile.seek(toOffset(w.page.getPageId()));
-                writeFile.write(w.getDiskBound(), 0, pageSize);
+                writeFile.write(w.getDiskBound(tmpFilesForRemoval), 0, pageSize);
                 w.done();
             }
 
@@ -1197,7 +1204,8 @@ public class PageFile {
                     // the write cache.
                     if (w.isDone()) {
                         writes.remove(w.page.getPageId());
-                        if (w.tmpFile != null && tmpFilesForRemoval.contains(w.tmpFile)) {
+                        if (w.tmpFile != null && tmpFilesForRemoval.containsKey(w.tmpFile)) {
+                            tmpFilesForRemoval.get(w.tmpFile).close();
                             if (!w.tmpFile.delete()) {
                                 throw new IOException("Can't delete temporary KahaDB transaction file:" + w.tmpFile);
                             }
@@ -1213,8 +1221,12 @@ public class PageFile {
         }
     }
 
-    public void removeTmpFile(File file) {
-        tmpFilesForRemoval.add(file);
+    public void removeTmpFile(File file, RandomAccessFile randomAccessFile) throws IOException {
+        if (!tmpFilesForRemoval.containsKey(file)) {
+            tmpFilesForRemoval.put(file, randomAccessFile);
+        } else {
+            randomAccessFile.close();
+        }
     }
 
     private long recoveryFileSizeForPages(int pageCount) {
index bdb7896..52b2f99 100644 (file)
@@ -656,8 +656,7 @@ public class Transaction implements Iterable<Page> {
     public void commit() throws IOException {
         if( writeTransactionId!=-1 ) {
             if (tmpFile != null) {
-                tmpFile.close();
-                pageFile.removeTmpFile(getTempFile());
+                pageFile.removeTmpFile(getTempFile(), tmpFile);
                 tmpFile = null;
                 txFile = null;
             }
@@ -683,7 +682,7 @@ public class Transaction implements Iterable<Page> {
         if( writeTransactionId!=-1 ) {
             if (tmpFile != null) {
                 tmpFile.close();
-                pageFile.removeTmpFile(getTempFile());
+                getTempFile().delete();
                 tmpFile = null;
                 txFile = null;
             }
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/TransactionTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/TransactionTest.java
new file mode 100644 (file)
index 0000000..900f8e2
--- /dev/null
@@ -0,0 +1,135 @@
+package org.apache.activemq.store.kahadb.disk.page;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import junit.framework.TestCase;
+import org.apache.activemq.store.kahadb.disk.util.Marshaller;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TransactionTest  extends TestCase {
+
+    private static long NUMBER_OF_BYTES = 10485760L;
+
+    static class TransactionTestMarshaller implements Marshaller<List<Byte>> {
+
+        public static TransactionTestMarshaller INSTANCE = new TransactionTestMarshaller();
+
+        @Override
+        public void writePayload(final List<Byte> object, final DataOutput dataOut) throws IOException {
+            for (Byte b : object) {
+                dataOut.write(b);
+            }
+        }
+
+        @Override
+        public List<Byte> readPayload(final DataInput dataIn) throws IOException {
+            List<Byte> result = new ArrayList<>();
+            for (int i = 0; i < NUMBER_OF_BYTES; i++) {
+                result.add(dataIn.readByte());
+            }
+
+            return result;
+        }
+
+        @Override
+        public int getFixedSize() {
+            return 0;
+        }
+
+        @Override
+        public boolean isDeepCopySupported() {
+            return false;
+        }
+
+        @Override
+        public List<Byte> deepCopy(final List<Byte> source) {
+            return new ArrayList<>(source);
+        }
+    }
+
+    public void testDeleteTempFileWhenRollback() throws IOException {
+        PageFile pf = new PageFile(new File("target/test-data"), getName());
+        pf.delete();
+        pf.setEnablePageCaching(false);
+        pf.load();
+        System.setProperty("maxKahaDBTxSize", "" + (1024*1024));
+
+
+        Transaction tx = pf.tx();
+        Page<List<Byte>> page = tx.allocate();
+
+        page.set(getBytes());
+
+        tx.store(page, TransactionTestMarshaller.INSTANCE, true);
+
+        File tempFile = tx.getTempFile();
+
+        assertTrue(tempFile.exists());
+
+        tx.rollback();
+        pf.flush();
+
+        assertFalse(tempFile.exists());
+    }
+
+    public void testHugeTransaction() throws IOException {
+        PageFile pf = new PageFile(new File("target/test-data"), getName());
+        pf.delete();
+        pf.setEnablePageCaching(false);
+        pf.load();
+        System.setProperty("maxKahaDBTxSize", "" + (1024*1024));
+
+
+        Transaction tx = pf.tx();
+        Page<List<Byte>> page = tx.allocate();
+
+        List<Byte> bytes = getBytes();
+
+        page.set(bytes);
+
+        tx.store(page, TransactionTestMarshaller.INSTANCE, true);
+        tx.commit();
+        pf.flush();
+
+        tx = pf.tx();
+
+        page = tx.load(page.getPageId(), TransactionTestMarshaller.INSTANCE);
+
+        for (int i = 0; i < NUMBER_OF_BYTES; i++) {
+            assertEquals(bytes.get(i), page.get().get(i));
+        }
+
+    }
+
+    private List<Byte> getBytes() {
+        List<Byte> bytes = new ArrayList<>();
+        byte b = 0;
+
+        for (int i = 0; i < NUMBER_OF_BYTES; i++) {
+            bytes.add(b++);
+        }
+
+        return bytes;
+    }
+}