AMQ-7129 - Properly recover messages from KahaDB for a durable when there are
authorChristopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Wed, 9 Jan 2019 17:51:03 +0000 (12:51 -0500)
committerChristopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Wed, 9 Jan 2019 19:34:06 +0000 (14:34 -0500)
messages to recover before the stored lastAck value

With individual ack mode we need to check the durable ackPosition
sequence set in the KahaDB index on subsription load to see if there are
earlier messages before the lastAck value that still haven't been acked.
While this normally wouldn't happen it is possible in individual ack
mode

activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java [new file with mode: 0644]

index 0351d06..b5c466d 100644 (file)
@@ -80,6 +80,7 @@ import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
 import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
 import org.apache.activemq.store.kahadb.disk.journal.Location;
 import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
 import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
@@ -1086,7 +1087,17 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
-                        sd.orderIndex.setBatch(tx, cursorPos);
+                        SequenceSet subAckPositions = getSequenceSet(tx, sd, subscriptionKey);
+                        //If we have ackPositions tracked then compare the first one as individual acknowledge mode
+                        //may have bumped lastAck even though there are earlier messages to still consume
+                        if (subAckPositions != null && !subAckPositions.isEmpty()
+                                && subAckPositions.getHead().getFirst() < cursorPos.lastAckedSequence) {
+                            //we have messages to ack before lastAckedSequence
+                            sd.orderIndex.setBatch(tx, subAckPositions.getHead().getFirst() - 1);
+                        } else {
+                            subAckPositions = null;
+                            sd.orderIndex.setBatch(tx, cursorPos);
+                        }
                         recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, listener);
                         for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
                                 .hasNext();) {
@@ -1094,6 +1105,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
                             if (ackedAndPrepared.contains(entry.getValue().messageId)) {
                                 continue;
                             }
+                            //If subAckPositions is set then verify the sequence set contains the message still
+                            //and if it doesn't skip it
+                            if (subAckPositions != null && !subAckPositions.contains(entry.getKey())) {
+                                continue;
+                            }
                             listener.recoverMessage(loadMessage(entry.getValue().location));
                         }
                         sd.orderIndex.resetCursorPosition();
@@ -1118,13 +1134,24 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
                         StoredDestination sd = getStoredDestination(dest, tx);
                         sd.orderIndex.resetCursorPosition();
                         MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
+                        SequenceSet subAckPositions = null;
                         if (moc == null) {
                             LastAck pos = getLastAck(tx, sd, subscriptionKey);
                             if (pos == null) {
                                 // sub deleted
                                 return;
                             }
-                            sd.orderIndex.setBatch(tx, pos);
+                            subAckPositions = getSequenceSet(tx, sd, subscriptionKey);
+                            //If we have ackPositions tracked then compare the first one as individual acknowledge mode
+                            //may have bumped lastAck even though there are earlier messages to still consume
+                            if (subAckPositions != null && !subAckPositions.isEmpty()
+                                    && subAckPositions.getHead().getFirst() < pos.lastAckedSequence) {
+                                //we have messages to ack before lastAckedSequence
+                                sd.orderIndex.setBatch(tx, subAckPositions.getHead().getFirst() - 1);
+                            } else {
+                                subAckPositions = null;
+                                sd.orderIndex.setBatch(tx, pos);
+                            }
                             moc = sd.orderIndex.cursor;
                         } else {
                             sd.orderIndex.cursor.sync(moc);
@@ -1138,6 +1165,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
                             if (ackedAndPrepared.contains(entry.getValue().messageId)) {
                                 continue;
                             }
+                            //If subAckPositions is set then verify the sequence set contains the message still
+                            //and if it doesn't skip it
+                            if (subAckPositions != null && !subAckPositions.contains(entry.getKey())) {
+                                continue;
+                            }
                             if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
                                 counter++;
                             }
@@ -1536,6 +1568,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
                 super(runnable, null);
             }
 
+            @Override
             public void setException(final Throwable e) {
                 super.setException(e);
             }
index c7ca4fe..5d9b548 100644 (file)
@@ -3000,6 +3000,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         return sd.subscriptionAcks.get(tx, subscriptionKey);
     }
 
+    protected SequenceSet getSequenceSet(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
+        if (sd.ackPositions != null) {
+            final SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
+            return messageSequences;
+        }
+
+        return null;
+    }
+
     protected long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
         if (sd.ackPositions != null) {
             SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
new file mode 100644 (file)
index 0000000..66890a9
--- /dev/null
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class KahaDBDurableMessageRecoveryTest {
+
+    @Parameters(name = "recoverIndex")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] { { false }, { true } });
+    }
+
+    @Rule
+    public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
+    private BrokerService broker;
+    private URI brokerConnectURI;
+
+    private boolean recoverIndex;
+
+    @Before
+    public void setUpBroker() throws Exception {
+        startBroker(false);
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    /**
+     * @param deleteIndex
+     */
+    public KahaDBDurableMessageRecoveryTest(boolean recoverIndex) {
+        super();
+        this.recoverIndex = recoverIndex;
+    }
+
+    protected void startBroker(boolean recoverIndex) throws Exception {
+        broker = new BrokerService();
+        broker.setPersistent(true);
+        broker.setDataDirectoryFile(dataFileDir.getRoot());
+
+        TransportConnector connector = broker.addConnector(new TransportConnector());
+        connector.setUri(new URI("tcp://0.0.0.0:0"));
+        connector.setName("tcp");
+        configurePersistence(broker, recoverIndex);
+
+        broker.start();
+        broker.waitUntilStarted();
+        brokerConnectURI = broker.getConnectorByName("tcp").getConnectUri();
+    }
+
+    protected void configurePersistence(BrokerService brokerService, boolean forceRecoverIndex) throws Exception {
+        KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
+
+        adapter.setForceRecoverIndex(forceRecoverIndex);
+
+        // set smaller size for test
+        adapter.setJournalMaxFileLength(1024 * 20);
+    }
+
+    protected void restartBroker(boolean deleteIndex) throws Exception {
+        stopBroker();
+        startBroker(deleteIndex);
+    }
+
+    protected Session getSession(int ackMode) throws Exception {
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId1");
+        connection.start();
+        Session session = connection.createSession(false, ackMode);
+
+        return session;
+    }
+
+    /**
+     * Test that on broker restart a durable topic subscription will recover all
+     * messages before the "last ack" in KahaDB which could happen if using
+     * individual acknowledge mode and skipping messages
+     */
+    @Test
+    public void durableRecoveryIndividualAcknowledge() throws Exception {
+        String testTopic = "test.topic";
+
+        Session session = getSession(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+        ActiveMQTopic topic = (ActiveMQTopic) session.createTopic(testTopic);
+        MessageProducer producer = session.createProducer(topic);
+        TopicSubscriber subscriber = session.createDurableSubscriber(topic, "sub1");
+
+        for (int i = 1; i <= 10; i++) {
+            producer.send(session.createTextMessage("msg: " + i));
+        }
+        producer.close();
+        assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
+
+        // Receive only the 5th message using individual ack mode
+        for (int i = 1; i <= 10; i++) {
+            TextMessage received = (TextMessage) subscriber.receive(1000);
+            assertNotNull(received);
+            if (i == 5) {
+                received.acknowledge();
+            }
+        }
+
+        // Verify there are 9 messages left still and restart broker
+        assertTrue(Wait.waitFor(() -> 9 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
+        subscriber.close();
+        restartBroker(recoverIndex);
+
+        // Verify 9 messages exist in store on startup
+        assertTrue(Wait.waitFor(() -> 9 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
+
+        // Recreate subscriber and try and receive the other 9 messages
+        session = getSession(ActiveMQSession.AUTO_ACKNOWLEDGE);
+        subscriber = session.createDurableSubscriber(topic, "sub1");
+
+        for (int i = 1; i <= 4; i++) {
+            TextMessage received = (TextMessage) subscriber.receive(1000);
+            assertNotNull(received);
+            assertEquals("msg: " + i, received.getText());
+        }
+        for (int i = 6; i <= 10; i++) {
+            TextMessage received = (TextMessage) subscriber.receive(1000);
+            assertNotNull(received);
+            assertEquals("msg: " + i, received.getText());
+        }
+
+        subscriber.close();
+        assertTrue(Wait.waitFor(() -> 0 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
+    }
+
+    @Test
+    public void multipleDurableRecoveryIndividualAcknowledge() throws Exception {
+        String testTopic = "test.topic";
+
+        Session session = getSession(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+        ActiveMQTopic topic = (ActiveMQTopic) session.createTopic(testTopic);
+        MessageProducer producer = session.createProducer(topic);
+        TopicSubscriber subscriber1 = session.createDurableSubscriber(topic, "sub1");
+        TopicSubscriber subscriber2 = session.createDurableSubscriber(topic, "sub2");
+
+        for (int i = 1; i <= 10; i++) {
+            producer.send(session.createTextMessage("msg: " + i));
+        }
+        producer.close();
+        assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
+        assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub2"), 3000, 500));
+
+        // Receive 2 messages using individual ack mode only on first sub
+        for (int i = 1; i <= 10; i++) {
+            TextMessage received = (TextMessage) subscriber1.receive(1000);
+            assertNotNull(received);
+            if (i == 3 || i == 7) {
+                received.acknowledge();
+            }
+        }
+
+        // Verify there are 8 messages left still and restart broker
+        assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
+        assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub2"), 3000, 500));
+        subscriber1.close();
+        subscriber2.close();
+        restartBroker(recoverIndex);
+
+        // Verify 8 messages exist in store on startup on sub 1 and 10 on sub 2
+        assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
+        assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub2"), 3000, 500));
+
+        // Recreate subscriber and try and receive the other 8 messages
+        session = getSession(ActiveMQSession.AUTO_ACKNOWLEDGE);
+        subscriber1 = session.createDurableSubscriber(topic, "sub1");
+        subscriber2 = session.createDurableSubscriber(topic, "sub2");
+
+        for (int i = 1; i <= 2; i++) {
+            TextMessage received = (TextMessage) subscriber1.receive(1000);
+            assertNotNull(received);
+            assertEquals("msg: " + i, received.getText());
+        }
+        for (int i = 4; i <= 6; i++) {
+            TextMessage received = (TextMessage) subscriber1.receive(1000);
+            assertNotNull(received);
+            assertEquals("msg: " + i, received.getText());
+        }
+        for (int i = 8; i <= 10; i++) {
+            TextMessage received = (TextMessage) subscriber1.receive(1000);
+            assertNotNull(received);
+            assertEquals("msg: " + i, received.getText());
+        }
+
+        // Make sure sub 2 gets all 10
+        for (int i = 1; i <= 10; i++) {
+            TextMessage received = (TextMessage) subscriber2.receive(1000);
+            assertNotNull(received);
+            assertEquals("msg: " + i, received.getText());
+        }
+
+        subscriber1.close();
+        subscriber2.close();
+        assertTrue(Wait.waitFor(() -> 0 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
+        assertTrue(Wait.waitFor(() -> 0 == getPendingMessageCount(topic, "clientId1", "sub2"), 3000, 500));
+    }
+
+    @Test
+    public void multipleDurableTestRecoverSubscription() throws Exception {
+        String testTopic = "test.topic";
+
+        Session session = getSession(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+        ActiveMQTopic topic = (ActiveMQTopic) session.createTopic(testTopic);
+        MessageProducer producer = session.createProducer(topic);
+        TopicSubscriber subscriber1 = session.createDurableSubscriber(topic, "sub1");
+        TopicSubscriber subscriber2 = session.createDurableSubscriber(topic, "sub2");
+
+        for (int i = 1; i <= 10; i++) {
+            producer.send(session.createTextMessage("msg: " + i));
+        }
+        producer.close();
+
+        // Receive 2 messages using individual ack mode only on first sub
+        for (int i = 1; i <= 10; i++) {
+            TextMessage received = (TextMessage) subscriber1.receive(1000);
+            assertNotNull(received);
+            if (i == 3 || i == 7) {
+                received.acknowledge();
+            }
+        }
+
+        // Verify there are 8 messages left on sub 1 and 10 on sub2 and restart
+        assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, "clientId1", "sub1"), 3000, 500));
+        assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1", "sub2"), 3000, 500));
+        subscriber1.close();
+        subscriber2.close();
+        restartBroker(recoverIndex);
+
+        //Manually recover subscription and verify proper messages are loaded
+        final Topic brokerTopic = (Topic) broker.getDestination(topic);
+        final TopicMessageStore store = (TopicMessageStore) brokerTopic.getMessageStore();
+        final AtomicInteger sub1Recovered = new AtomicInteger();
+        final AtomicInteger sub2Recovered = new AtomicInteger();
+        store.recoverSubscription("clientId1", "sub1", new MessageRecoveryListener() {
+            @Override
+            public boolean recoverMessageReference(MessageId ref) throws Exception {
+                return false;
+            }
+
+            @Override
+            public boolean recoverMessage(Message message) throws Exception {
+                TextMessage textMessage = (TextMessage) message;
+                if (textMessage.getText().equals("msg: " + 3) || textMessage.getText().equals("msg: " + 7)) {
+                    throw new IllegalStateException("Got wrong message: " + textMessage.getText());
+                }
+                sub1Recovered.incrementAndGet();
+                return true;
+            }
+
+            @Override
+            public boolean isDuplicate(MessageId ref) {
+                return false;
+            }
+
+            @Override
+            public boolean hasSpace() {
+                return true;
+            }
+        });
+
+        store.recoverSubscription("clientId1", "sub2", new MessageRecoveryListener() {
+            @Override
+            public boolean recoverMessageReference(MessageId ref) throws Exception {
+                return false;
+            }
+
+            @Override
+            public boolean recoverMessage(Message message) throws Exception {
+                sub2Recovered.incrementAndGet();
+                return true;
+            }
+
+            @Override
+            public boolean isDuplicate(MessageId ref) {
+                return false;
+            }
+
+            @Override
+            public boolean hasSpace() {
+                return true;
+            }
+        });
+
+        //Verify proper number of messages are recovered
+        assertEquals(8, sub1Recovered.get());
+        assertEquals(10, sub2Recovered.get());
+    }
+
+    protected long getPendingMessageCount(ActiveMQTopic topic, String clientId, String subId) throws Exception {
+        final Topic brokerTopic = (Topic) broker.getDestination(topic);
+        final TopicMessageStore store = (TopicMessageStore) brokerTopic.getMessageStore();
+        return store.getMessageCount(clientId, subId);
+    }
+}