AMQ-7132 - ActiveMQ reads lots of index pages upon startup (after a graceful or ungra...
authorAlan Protasio <alanprot@gmail.com>
Fri, 11 Jan 2019 05:20:57 +0000 (21:20 -0800)
committerChristopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Mon, 14 Jan 2019 15:54:26 +0000 (10:54 -0500)
12 files changed:
activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java
activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq-unit-tests/src/test/java/org/apache/activemq/broker/RecoveryStatsBrokerTest.java [new file with mode: 0644]
activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java
activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MKahaDBStoreLimitTest.java
activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageTest.java
activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.java
activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db-1.log [new file with mode: 0644]
activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db.data [new file with mode: 0644]
activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db.redo [new file with mode: 0644]
activemq-web/src/main/java/org/apache/activemq/web/LocalBrokerFacade.java

index e2bc033..9be0d11 100644 (file)
@@ -92,6 +92,13 @@ public class SizeStatisticImpl extends StatisticImpl {
     }
 
     /**
+     * @return the maximum size of any step
+     */
+    public synchronized void setMaxSize(long size) {
+        maxSize = size;
+    }
+
+    /**
      * @return the minimum size of any step
      */
     public synchronized long getMinSize() {
@@ -99,12 +106,23 @@ public class SizeStatisticImpl extends StatisticImpl {
     }
 
     /**
+     * @return the maximum size of any step
+     */
+    public synchronized void setMinSize(long size) {
+        minSize = size;
+    }
+
+    /**
      * @return the total size of all the steps added together
      */
     public synchronized long getTotalSize() {
         return totalSize;
     }
 
+    public synchronized void setCount(long count) {
+        this.count = count;
+    }
+
     /**
      * @return the average size calculated by dividing the total size by the
      *         number of counts
index b5c466d..37f3b90 100644 (file)
@@ -459,7 +459,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
                         ackedAndPrepared.remove(id);
                         if (rollback) {
                             rolledBackAcks.add(id);
-                            incrementAndAddSizeToStoreStat(dest, 0);
+                            pageFile.tx().execute(tx -> {
+                                incrementAndAddSizeToStoreStat(tx, dest, 0);
+                            });
                         }
                     }
                 } finally {
@@ -812,16 +814,19 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
                     recoveredStatistics = pageFile.tx().execute(new Transaction.CallableClosure<MessageStoreStatistics, IOException>() {
                         @Override
                         public MessageStoreStatistics execute(Transaction tx) throws IOException {
-                            MessageStoreStatistics statistics = new MessageStoreStatistics();
+                            MessageStoreStatistics statistics = getStoredMessageStoreStatistics(dest, tx);
 
                             // Iterate through all index entries to get the size of each message
-                            StoredDestination sd = getStoredDestination(dest, tx);
-                            for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
-                                int locationSize = iterator.next().getKey().getSize();
-                                statistics.getMessageCount().increment();
-                                statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0);
+                            if (statistics == null) {
+                                StoredDestination sd = getStoredDestination(dest, tx);
+                                statistics = new MessageStoreStatistics();
+                                for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext(); ) {
+                                    int locationSize = iterator.next().getKey().getSize();
+                                    statistics.getMessageCount().increment();
+                                    statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0);
+                                }
                             }
-                           return statistics;
+                            return statistics;
                         }
                     });
                     recoveredStatistics.getMessageCount().subtract(ackedAndPrepared.size());
index 89f694b..78d2bfa 100644 (file)
@@ -134,7 +134,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     static final int OPEN_STATE = 2;
     static final long NOT_ACKED = -1;
 
-    static final int VERSION = 6;
+    static final int VERSION = 7;
 
     static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1;
 
@@ -188,6 +188,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             } catch (EOFException expectedOnUpgrade) {
                 openwireVersion = OpenWireFormat.DEFAULT_LEGACY_VERSION;
             }
+
             LOG.info("KahaDB is version " + version);
         }
 
@@ -863,7 +864,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                     sd.messageIdIndex.remove(tx, keys.messageId);
                     metadata.producerSequenceIdTracker.rollback(keys.messageId);
                     undoCounter++;
-                    decrementAndSubSizeToStoreStat(key, keys.location.getSize());
+                    decrementAndSubSizeToStoreStat(tx, key, sd, keys.location.getSize());
                     // TODO: do we need to modify the ack positions for the pub sub case?
                 }
             }
@@ -979,7 +980,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                             sd.messageIdIndex.remove(tx, keys.messageId);
                             LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location);
                             undoCounter++;
-                            decrementAndSubSizeToStoreStat(sdEntry.getKey(), keys.location.getSize());
+                            decrementAndSubSizeToStoreStat(tx, sdEntry.getKey(), sdEntry.getValue(), keys.location.getSize());
                             // TODO: do we need to modify the ack positions for the pub sub case?
                         }
                     } else {
@@ -1491,7 +1492,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         if (previous == null) {
             previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
             if (previous == null) {
-                incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
+                incrementAndAddSizeToStoreStat(tx, command.getDestination(), location.getSize());
                 sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
                 if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
                     addAckLocationForNewMessage(tx, command.getDestination(), sd, id);
@@ -1550,11 +1551,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                     new MessageKeys(command.getMessageId(), location)
             );
             sd.locationIndex.put(tx, location, id);
-            incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
+            incrementAndAddSizeToStoreStat(tx, command.getDestination(), location.getSize());
 
             if (previousKeys != null) {
                 //Remove the existing from the size
-                decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize());
+                decrementAndSubSizeToStoreStat(tx, command.getDestination(), previousKeys.location.getSize());
 
                 //update all the subscription metrics
                 if (enableSubscriptionStatistics && sd.ackPositions != null && location.getSize() != previousKeys.location.getSize()) {
@@ -1590,7 +1591,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
                 if (keys != null) {
                     sd.locationIndex.remove(tx, keys.location);
-                    decrementAndSubSizeToStoreStat(command.getDestination(), keys.location.getSize());
+                    decrementAndSubSizeToStoreStat(tx, command.getDestination(), keys.location.getSize());
                     recordAckMessageReferenceLocation(ackLocation, keys.location);
                     metadata.lastUpdate = ackLocation;
                 }  else if (LOG.isDebugEnabled()) {
@@ -1655,6 +1656,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         sd.messageIdIndex.unload(tx);
         tx.free(sd.messageIdIndex.getPageId());
 
+        tx.free(sd.messageStoreStatistics.getPageId());
+        sd.messageStoreStatistics = null;
+
         if (sd.subscriptions != null) {
             sd.subscriptions.clear(tx);
             sd.subscriptions.unload(tx);
@@ -2362,6 +2366,53 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         }
     }
 
+    class StoredMessageStoreStatistics {
+        private PageFile pageFile;
+        private Page<MessageStoreStatistics> page;
+        private long pageId;
+        private AtomicBoolean loaded = new AtomicBoolean();
+        private MessageStoreStatisticsMarshaller messageStoreStatisticsMarshaller = new MessageStoreStatisticsMarshaller();
+
+        StoredMessageStoreStatistics(PageFile pageFile, long pageId) {
+            this.pageId = pageId;
+            this.pageFile = pageFile;
+        }
+
+        StoredMessageStoreStatistics(PageFile pageFile, Page page) {
+            this(pageFile, page.getPageId());
+        }
+
+        public long getPageId() {
+            return pageId;
+        }
+
+        synchronized void load(Transaction tx) throws IOException {
+            if (loaded.compareAndSet(false, true)) {
+                page = tx.load(pageId, null);
+
+                if (page.getType() == Page.PAGE_FREE_TYPE) {
+                    page.set(null);
+                    tx.store(page, messageStoreStatisticsMarshaller, true);
+                }
+            }
+            page = tx.load(pageId, messageStoreStatisticsMarshaller);
+        }
+
+        synchronized MessageStoreStatistics get(Transaction tx) throws IOException {
+            load(tx);
+            return page.get();
+        }
+
+        synchronized void put(Transaction tx, MessageStoreStatistics storeStatistics) throws IOException {
+            if (page == null) {
+                page = tx.load(pageId, messageStoreStatisticsMarshaller);
+            }
+
+            page.set(storeStatistics);
+
+            tx.store(page, messageStoreStatisticsMarshaller, true);
+        }
+    }
     class StoredDestination {
 
         MessageOrderIndex orderIndex = new MessageOrderIndex();
@@ -2378,6 +2429,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         // Transient data used to track which Messages are no longer needed.
         final HashSet<String> subscriptionCache = new LinkedHashSet<>();
 
+        StoredMessageStoreStatistics messageStoreStatistics;
+
         public void trackPendingAdd(Long seq) {
             orderIndex.trackPendingAdd(seq);
         }
@@ -2392,6 +2445,38 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         }
     }
 
+    protected  class MessageStoreStatisticsMarshaller extends VariableMarshaller<MessageStoreStatistics> {
+
+        @Override
+        public void writePayload(final MessageStoreStatistics object, final DataOutput dataOut) throws IOException {
+            dataOut.writeBoolean(null != object);
+            if (object != null) {
+                dataOut.writeLong(object.getMessageCount().getCount());
+                dataOut.writeLong(object.getMessageSize().getTotalSize());
+                dataOut.writeLong(object.getMessageSize().getMaxSize());
+                dataOut.writeLong(object.getMessageSize().getMinSize());
+                dataOut.writeLong(object.getMessageSize().getCount());
+            }
+        }
+
+        @Override
+        public MessageStoreStatistics readPayload(final DataInput dataIn) throws IOException {
+
+            if (!dataIn.readBoolean()) {
+                return null;
+            }
+
+            MessageStoreStatistics messageStoreStatistics = new MessageStoreStatistics();
+            messageStoreStatistics.getMessageCount().setCount(dataIn.readLong());
+            messageStoreStatistics.getMessageSize().setTotalSize(dataIn.readLong());
+            messageStoreStatistics.getMessageSize().setMaxSize(dataIn.readLong());
+            messageStoreStatistics.getMessageSize().setMinSize(dataIn.readLong());
+            messageStoreStatistics.getMessageSize().setCount(dataIn.readLong());
+
+            return messageStoreStatistics;
+        }
+    }
+
     protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
 
         final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
@@ -2470,6 +2555,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                     });
                 }
             }
+
             if (metadata.version >= 2) {
                 value.orderIndex.lowPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
                 value.orderIndex.highPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
@@ -2491,6 +2577,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 });
             }
 
+            if (metadata.version >= 7) {
+                value.messageStoreStatistics = new StoredMessageStoreStatistics(pageFile, dataIn.readLong());
+            } else {
+                pageFile.tx().execute(tx -> {
+                    value.messageStoreStatistics = new StoredMessageStoreStatistics(pageFile, tx.allocate());
+                    value.messageStoreStatistics.load(tx);
+                });
+            }
+
             return value;
         }
 
@@ -2510,6 +2605,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             }
             dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
             dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
+            dataOut.writeLong(value.messageStoreStatistics.getPageId());
         }
     }
 
@@ -2543,6 +2639,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         return rc;
     }
 
+    protected MessageStoreStatistics getStoredMessageStoreStatistics(KahaDestination destination, Transaction tx) throws IOException {
+        StoredDestination sd = getStoredDestination(destination, tx);
+        return  sd != null && sd.messageStoreStatistics != null ? sd.messageStoreStatistics.get(tx) : null;
+    }
+
     protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
         String key = key(destination);
         StoredDestination rc = storedDestinations.get(key);
@@ -2575,9 +2676,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 rc.ackPositions = new ListIndex<>(pageFile, tx.allocate());
                 rc.subLocations = new ListIndex<>(pageFile, tx.allocate());
             }
+
+            rc.messageStoreStatistics = new StoredMessageStoreStatistics(pageFile, tx.allocate());
+
             metadata.destinations.put(tx, key, rc);
         }
 
+        rc.messageStoreStatistics.load(tx);
+
         // Configure the marshalers and load.
         rc.orderIndex.load(tx);
 
@@ -2644,9 +2750,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 }
             }
 
-            // Configure the message references index
-
-
             // Configure the subscription cache
             for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
                 Entry<String, LastAck> entry = iterator.next();
@@ -2707,31 +2810,60 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
      * @param kahaDestination
      * @param size
      */
-    protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, long size) {
-        incrementAndAddSizeToStoreStat(key(kahaDestination), size);
+    protected void incrementAndAddSizeToStoreStat(Transaction tx, KahaDestination kahaDestination, long size) throws IOException {
+        StoredDestination sd = getStoredDestination(kahaDestination, tx);
+        incrementAndAddSizeToStoreStat(tx, key(kahaDestination), sd, size);
     }
 
-    protected void incrementAndAddSizeToStoreStat(String kahaDestKey, long size) {
+    protected void incrementAndAddSizeToStoreStat(Transaction tx, String kahaDestKey, StoredDestination sd, long size) throws IOException {
         MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
         if (storeStats != null) {
-            storeStats.getMessageCount().increment();
-            if (size > 0) {
-                storeStats.getMessageSize().addSize(size);
+            incrementAndAddSizeToStoreStat(size, storeStats);
+            sd.messageStoreStatistics.put(tx, storeStats);
+        } else if (sd != null){
+            // During the recovery the storeStats is null
+            MessageStoreStatistics storedStoreStats = sd.messageStoreStatistics.get(tx);
+            if (storedStoreStats == null) {
+                storedStoreStats = new MessageStoreStatistics();
             }
+            incrementAndAddSizeToStoreStat(size, storedStoreStats);
+            sd.messageStoreStatistics.put(tx, storedStoreStats);
         }
     }
 
-    protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, long size) {
-        decrementAndSubSizeToStoreStat(key(kahaDestination), size);
+    private void incrementAndAddSizeToStoreStat(final long size, final MessageStoreStatistics storedStoreStats) {
+        storedStoreStats.getMessageCount().increment();
+        if (size > 0) {
+            storedStoreStats.getMessageSize().addSize(size);
+        }
     }
 
-    protected void decrementAndSubSizeToStoreStat(String kahaDestKey, long size) {
+    protected void decrementAndSubSizeToStoreStat(Transaction tx, KahaDestination kahaDestination, long size) throws IOException {
+        StoredDestination sd = getStoredDestination(kahaDestination, tx);
+        decrementAndSubSizeToStoreStat(tx, key(kahaDestination), sd,size);
+    }
+
+    protected void decrementAndSubSizeToStoreStat(Transaction tx, String kahaDestKey, StoredDestination sd, long size) throws IOException {
         MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
         if (storeStats != null) {
-            storeStats.getMessageCount().decrement();
-            if (size > 0) {
-                storeStats.getMessageSize().addSize(-size);
+            decrementAndSubSizeToStoreStat(size, storeStats);
+            sd.messageStoreStatistics.put(tx, storeStats);
+        } else if (sd != null){
+            // During the recovery the storeStats is null
+            MessageStoreStatistics storedStoreStats = sd.messageStoreStatistics.get(tx);
+            if (storedStoreStats == null) {
+                storedStoreStats = new MessageStoreStatistics();
             }
+            decrementAndSubSizeToStoreStat(size, storedStoreStats);
+            sd.messageStoreStatistics.put(tx, storedStoreStats);
+        }
+    }
+
+    private void decrementAndSubSizeToStoreStat(final long size, final MessageStoreStatistics storedStoreStats) {
+        storedStoreStats.getMessageCount().decrement();
+
+        if (size > 0) {
+            storedStoreStats.getMessageSize().addSize(-size);
         }
     }
 
@@ -2936,7 +3068,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                     sd.locationIndex.remove(tx, entry.getValue().location);
                     sd.messageIdIndex.remove(tx, entry.getValue().messageId);
                     sd.orderIndex.remove(tx, entry.getKey());
-                    decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
+                    decrementAndSubSizeToStoreStat(tx, command.getDestination(), entry.getValue().location.getSize());
                 }
             }
         }
@@ -2990,7 +3122,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                     sd.locationIndex.remove(tx, entry.getValue().location);
                     sd.messageIdIndex.remove(tx, entry.getValue().messageId);
                     sd.orderIndex.remove(tx, entry.getKey());
-                    decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
+                    decrementAndSubSizeToStoreStat(tx, command.getDestination(), entry.getValue().location.getSize());
                 }
             }
         }
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RecoveryStatsBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RecoveryStatsBrokerTest.java
new file mode 100644 (file)
index 0000000..3692574
--- /dev/null
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.store.MessageStoreStatistics;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.IOHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+@RunWith(value = Parameterized.class)
+public class RecoveryStatsBrokerTest extends BrokerRestartTestSupport {
+
+    private RestartType restartType;
+
+    enum RestartType {
+        NORMAL,
+        FULL_RECOVERY,
+        UNCLEAN_SHUTDOWN
+    }
+
+    @Override
+    protected void configureBroker(BrokerService broker) throws Exception {
+        KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
+        persistenceAdapter.setJournalMaxFileLength(1024*1024);
+        //persistenceAdapter.setConcurrentStoreAndDispatchQueues(false);
+        persistenceAdapter.setDirectory(broker.getBrokerDataDirectory());
+        broker.setPersistenceAdapter(persistenceAdapter);
+        broker.setDestinationPolicy(policyMap);
+    }
+
+    protected void restartBroker(RestartType restartType) throws Exception {
+        if (restartType == RestartType.FULL_RECOVERY) {
+            stopBroker();
+            KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+            File dir = kahaDBPersistenceAdapter.getDirectory();
+            if (dir != null) {
+                IOHelper.deleteFile(new File(dir, "db.data"));
+            }
+            broker.start();
+        } else if (restartType == RestartType.UNCLEAN_SHUTDOWN){
+            //Simulate an unclean  shutdown
+
+            File dir = broker.getBrokerDataDirectory();
+            File backUpDir = new File(dir, "bk");
+            IOHelper.mkdirs(new File(dir, "bk"));
+
+            for (File f: dir.listFiles()) {
+                if (!f.isDirectory()) {
+                    IOHelper.copyFile(f, new File(backUpDir, f.getName()));
+                }
+            }
+
+            stopBroker();
+
+            for (File f: backUpDir.listFiles()) {
+                if (!f.isDirectory()) {
+                    IOHelper.copyFile(f, new File(dir, f.getName()));
+                }
+            }
+
+            broker.start();
+        } else {
+            restartBroker();
+        }
+    }
+
+    @Parameterized.Parameters(name="{0}")
+    public static Collection<Object[]> getTestParameters() {
+        return Arrays.asList(new Object[][] {
+                {RestartType.NORMAL},
+                {RestartType.FULL_RECOVERY},
+                {RestartType.UNCLEAN_SHUTDOWN},
+        });
+    }
+
+    public RecoveryStatsBrokerTest(RestartType restartType) {
+        this.restartType = restartType;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testStaticsRecovery() throws Exception {
+        List<ActiveMQDestination> destinations = ImmutableList.of(new ActiveMQQueue("TEST.A"), new ActiveMQQueue("TEST.B"));
+        Random random = new Random();
+        Map<ActiveMQDestination, Integer> consumedMessages = new HashMap<>();
+
+        destinations.forEach(destination -> consumedMessages.put(destination, 0));
+
+        int numberOfMessages = 10000;
+
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        for (int i = 0; i < numberOfMessages; i++) {
+            for (ActiveMQDestination destination : destinations) {
+                Message message = createMessage(producerInfo, destination);
+                message.setPersistent(true);
+                message.setProducerId(message.getMessageId().getProducerId());
+                connection.request(message);
+            }
+        }
+
+        Map<ActiveMQDestination, MessageStoreStatistics> originalStatistics = getCurrentStatistics(destinations);
+
+        checkStatistics(destinations, originalStatistics);
+
+        restartBroker(restartType);
+
+        checkStatistics(destinations, originalStatistics);
+
+        for (ActiveMQDestination destination : destinations) {
+            consume(destination, 100, false);
+        }
+
+        checkStatistics(destinations, originalStatistics);
+
+        restartBroker(restartType);
+
+        checkStatistics(destinations, originalStatistics);
+
+        for (ActiveMQDestination destination : destinations) {
+            int messagesToConsume = random.nextInt(numberOfMessages);
+            consume(destination, messagesToConsume, true);
+            consumedMessages.compute(destination, (key, value) -> value = value + messagesToConsume);
+        }
+
+        originalStatistics = getCurrentStatistics(destinations);
+
+        for (ActiveMQDestination destination : destinations) {
+            int consumedCount = consumedMessages.get(destination);
+            assertEquals("",  numberOfMessages - consumedCount, originalStatistics.get(destination).getMessageCount().getCount());
+        }
+
+        checkStatistics(destinations, originalStatistics);
+
+        restartBroker(restartType);
+
+        checkStatistics(destinations, originalStatistics);
+    }
+
+    private Map<ActiveMQDestination, MessageStoreStatistics> getCurrentStatistics(final List<ActiveMQDestination> destinations) {
+        return destinations.stream()
+                .map(destination -> getDestination(broker, destination))
+                .collect(Collectors.toMap(destination -> new ActiveMQQueue(destination.getName()), destination2 -> destination2.getMessageStore().getMessageStoreStatistics()));
+    }
+
+    private void consume(ActiveMQDestination destination, int numberOfMessages, boolean shouldAck) throws Exception {
+        // Setup the consumer and receive the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        // The we should get the messages.
+        for (int i = 0; i < numberOfMessages; i++) {
+            Message m2 = receiveMessage(connection);
+            assertNotNull(m2);
+            if (shouldAck) {
+                MessageAck ack = createAck(consumerInfo, m2, 1, MessageAck.STANDARD_ACK_TYPE);
+                connection.request(ack);
+            }
+        }
+
+        connection.request(closeConnectionInfo(connectionInfo));
+    }
+
+    private void checkStatistics(final List<ActiveMQDestination> destinations, final Map<ActiveMQDestination, MessageStoreStatistics> originalStatistics) {
+        for (ActiveMQDestination destination : destinations) {
+            MessageStoreStatistics original = originalStatistics.get(destination);
+            MessageStoreStatistics actual = getDestination(broker, destination).getMessageStore().getMessageStoreStatistics();
+            assertEquals("Have Same Count", original.getMessageCount().getCount(), actual.getMessageCount().getCount());
+            assertEquals("Have Same TotalSize", original.getMessageSize().getTotalSize(), getDestination(broker, destination).getMessageStore().getMessageStoreStatistics().getMessageSize().getTotalSize());
+        }
+    }
+
+    protected Destination getDestination(BrokerService target, ActiveMQDestination destination) {
+        RegionBroker regionBroker = (RegionBroker) target.getRegionBroker();
+        if (destination.isTemporary()) {
+            return destination.isQueue() ? regionBroker.getTempQueueRegion().getDestinationMap().get(destination) :
+                    regionBroker.getTempTopicRegion().getDestinationMap().get(destination);
+        }
+        return destination.isQueue() ?
+                regionBroker.getQueueRegion().getDestinationMap().get(destination) :
+                regionBroker.getTopicRegion().getDestinationMap().get(destination);
+    }
+}
index 0b643b9..85e785d 100644 (file)
@@ -57,6 +57,7 @@ public class KahaDBVersionTest extends TestCase {
     final static File VERSION_3_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion3");
     final static File VERSION_4_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion4");
     final static File VERSION_5_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion5");
+    final static File VERSION_6_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6");
 
 
     BrokerService broker = null;
@@ -133,6 +134,10 @@ public class KahaDBVersionTest extends TestCase {
         doConvertRestartCycle(VERSION_5_DB);
     }
 
+    public void testVersion6Conversion() throws Exception {
+        doConvertRestartCycle(VERSION_6_DB);
+    }
+
     public void doConvertRestartCycle(File existingStore) throws Exception {
 
         File testDir = new File("target/activemq-data/kahadb/versionDB");
index 4a8eea9..9ebffca 100644 (file)
@@ -177,7 +177,7 @@ public class MKahaDBStoreLimitTest {
 
         FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
         StoreUsage storeUsage = new StoreUsage();
-        storeUsage.setLimit(40*1024);
+        storeUsage.setLimit(44*1024);
 
         filtered.setUsage(storeUsage);
         filtered.setDestination(queueA);
index defba3a..0c048b2 100644 (file)
@@ -37,7 +37,7 @@ public class StoreUsageTest extends EmbeddedBrokerTestSupport {
     @Override
     protected BrokerService createBroker() throws Exception {
         BrokerService broker = super.createBroker();
-        broker.getSystemUsage().getStoreUsage().setLimit(34 * 1024);
+        broker.getSystemUsage().getStoreUsage().setLimit(38 * 1024);
         broker.deleteAllMessages();
         return broker;
     }
index 0afc8da..8cf24e7 100644 (file)
@@ -218,7 +218,7 @@ public class DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest extends org
             assertTrue("no leak of pages, always use just 11", Wait.waitFor(new Wait.Condition() {
                 @Override
                 public boolean isSatisified() throws Exception {
-                    return 11 == store.getPageFile().getPageCount() -
+                    return 12 == store.getPageFile().getPageCount() -
                             store.getPageFile().getFreePageCount();
                 }
             }, TimeUnit.SECONDS.toMillis(10)));
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db-1.log b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db-1.log
new file mode 100644 (file)
index 0000000..34facec
Binary files /dev/null and b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db-1.log differ
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db.data b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db.data
new file mode 100644 (file)
index 0000000..6c71774
Binary files /dev/null and b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db.data differ
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db.redo b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db.redo
new file mode 100644 (file)
index 0000000..5cb7b87
Binary files /dev/null and b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db.redo differ
index 50ef17c..28af6a2 100644 (file)
@@ -87,6 +87,7 @@ public class LocalBrokerFacade extends BrokerFacadeSupport {
         }
     }
 
+
     private Destination unwrap(Destination dest) {
         if (dest instanceof DestinationFilter) {
             return unwrap(((DestinationFilter) dest).getNext());