AMQ-7129 - fix durable message size statistics with individual ack
authorChristopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Fri, 11 Jan 2019 14:56:03 +0000 (09:56 -0500)
committerChristopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Fri, 11 Jan 2019 14:56:57 +0000 (09:56 -0500)
Make sure that the pending message size for a durable sub only includes
messages part of the ack range

activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java

index 5d9b548..0bc6e81 100644 (file)
@@ -3030,16 +3030,23 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
 
             if (messageSequences != null) {
-                Sequence head = messageSequences.getHead();
-                if (head != null) {
+                if (!messageSequences.isEmpty()) {
+                    final Sequence head = messageSequences.getHead();
+
                     //get an iterator over the order index starting at the first unacked message
                     //and go over each message to add up the size
                     Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx,
                             new MessageOrderCursor(head.getFirst()));
 
+                    final boolean contiguousRange = messageSequences.size() == 1;
                     while (iterator.hasNext()) {
                         Entry<Long, MessageKeys> entry = iterator.next();
-                        locationSize += entry.getValue().location.getSize();
+                        //Verify sequence contains the key
+                        //if contiguous we just add all starting with the first but if not
+                        //we need to check if the id is part of the range - could happen if individual ack mode was used
+                        if (contiguousRange || messageSequences.contains(entry.getKey())) {
+                            locationSize += entry.getValue().location.getSize();
+                        }
                     }
                 }
             }
index a44e8c0..519648e 100644 (file)
@@ -209,6 +209,12 @@ public class KahaDBDurableMessageRecoveryTest {
         // Verify there are 8 messages left still and restart broker
         assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
         assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub2"), 3000, 500));
+
+        //Verify the pending size is less for sub1
+        assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") > 0);
+        assertTrue(getPendingMessageSize(topic, "clientId1", "sub2") > 0);
+        assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") < getPendingMessageSize(topic, "clientId1", "sub2"));
+
         subscriber1.close();
         subscriber2.close();
         restartBroker(recoverIndex);
@@ -217,6 +223,11 @@ public class KahaDBDurableMessageRecoveryTest {
         assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
         assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub2"), 3000, 500));
 
+        //Verify the pending size is less for sub1
+        assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") > 0);
+        assertTrue(getPendingMessageSize(topic, "clientId1", "sub2") > 0);
+        assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") < getPendingMessageSize(topic, "clientId1", "sub2"));
+
         // Recreate subscriber and try and receive the other 8 messages
         session = getSession(ActiveMQSession.AUTO_ACKNOWLEDGE);
         subscriber1 = session.createDurableSubscriber(topic, "sub1");
@@ -347,4 +358,10 @@ public class KahaDBDurableMessageRecoveryTest {
         final TopicMessageStore store = (TopicMessageStore) brokerTopic.getMessageStore();
         return store.getMessageCount(clientId, subId);
     }
+
+    protected long getPendingMessageSize(ActiveMQTopic topic, String clientId, String subId) throws Exception {
+        final Topic brokerTopic = (Topic) broker.getDestination(topic);
+        final TopicMessageStore store = (TopicMessageStore) brokerTopic.getMessageStore();
+        return store.getMessageSize(clientId, subId);
+    }
 }