QPID-8222: [JMS AMQP 0-x][AMQP 0-8..0-91] Fix cleaning of dispatcher queue from failo...
authorAlex Rudyy <orudyy@apache.org>
Thu, 9 Aug 2018 17:55:20 +0000 (18:55 +0100)
committerAlex Rudyy <orudyy@apache.org>
Fri, 10 Aug 2018 16:49:27 +0000 (17:49 +0100)
client/src/main/java/org/apache/qpid/client/AMQSession.java
client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
systests/src/test/java/org/apache/qpid/systest/connection/FailoverBehaviourTest.java

index ffb01d8..ffcbcac 100644 (file)
@@ -2417,17 +2417,22 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
         // return the first <total number of msgs received on session>
         // messages sent by the brokers following the first rollback
         // after failover
-        _highestDeliveryTag.set(-1);
+        resetRollbackMarkers();
 
         _unacknowledgedMessageTags.clear();
         _prefetchedMessageTags.clear();
 
-        _rollbackMark.set(-1);
         clearResolvedDestinations();
         resubscribeProducers();
         resubscribeConsumers();
     }
 
+    void resetRollbackMarkers()
+    {
+        _highestDeliveryTag.set(-1);
+        _rollbackMark.set(-1);
+    }
+
     void setHasMessageListeners()
     {
         _hasMessageListeners = true;
@@ -3594,7 +3599,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
 
                 try
                 {
-                    while (connectionStopped())
+                    while (!getAMQConnection().isFailingOver() && connectionStopped())
                     {
                         _lock.wait();
                     }
@@ -3870,11 +3875,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
         }
     }
 
-    protected void clearDispatchQueue()
-    {
-        _queue.clear();
-    }
-
     private void shutdownFlowControlNoAckTaskPool()
     {
         if (_flowControlNoAckTaskPool != null)
index 6c7738a..a689840 100644 (file)
@@ -176,7 +176,16 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
     @Override
     void resubscribe() throws QpidException
     {
-        clearDispatchQueue();
+        try
+        {
+            setUsingDispatcherForCleanup(true);
+            resetRollbackMarkers();
+            syncDispatchQueue(true);
+        }
+        finally
+        {
+            setUsingDispatcherForCleanup(false);
+        }
 
         getDeliveredMessageTags().clear();
         super.resubscribe();
index 9601de6..39d8b2e 100644 (file)
@@ -560,6 +560,36 @@ public class FailoverBehaviourTest extends JmsTestBase implements ExceptionListe
         doFailoverWhilstPublishingInFlight();
     }
 
+    @Test
+    public void testFailoverWithDirtyStoppedTransactionSessionHavingPrefetchedMessages() throws Exception
+    {
+        assumeTrue(getBrokerAdmin().supportsPersistence());
+        init(Session.SESSION_TRANSACTED, true);
+
+        produceMessages();
+        _producerSession.commit();
+
+        Message message = _consumer.receive(getReceiveTimeout());
+        assertNotNull("Message is not received", message);
+
+        _connection.stop();
+
+        MessageProducer messageProducer = _consumerSession.createProducer(_destination);
+        messageProducer.send(_consumerSession.createTextMessage("Test"));
+
+        getBrokerAdmin().restart();
+
+        try
+        {
+            _consumerSession.commit();
+            fail("Exception is expected");
+        }
+        catch (JMSException e)
+        {
+            // pass
+        }
+    }
+
     private void doFailoverWhilstPublishingInFlight() throws Exception
     {
         init(Session.SESSION_TRANSACTED, false);