QPIDJMS-434 Safe iteration over consumer that can close during
authorTimothy Bish <tabish121@gmail.com>
Tue, 20 Nov 2018 22:06:05 +0000 (17:06 -0500)
committerTimothy Bish <tabish121@gmail.com>
Tue, 20 Nov 2018 22:06:05 +0000 (17:06 -0500)
Ensure that we safely iterate over the consumers when processing a
client acknowledge to ensure that deferred closures don't cause a
ConcurrentModificationException which will error out the acknowledge
operation.

qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java

index 83f12f7..c71a485 100644 (file)
@@ -71,7 +71,10 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> i
      *      controls the acknowledgement that is applied to each message.
      */
     public void acknowledge(final ACK_TYPE ackType) {
-        for (AmqpConsumer consumer : consumers.values()) {
+        // A consumer whose close was deferred will be closed and removed from the consumers
+        // map so we must copy the entries to safely traverse the collection during this operation.
+        List<AmqpConsumer> consumers = new ArrayList<>(this.consumers.values());
+        for (AmqpConsumer consumer : consumers) {
             consumer.acknowledge(ackType);
         }
     }
index fbc0e41..7be79eb 100644 (file)
@@ -1590,7 +1590,85 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout=20000)
-    public void testConsumerWithDeferredCloseActsAsClosed() throws Exception {
+    public void testCloseConsumersWithDeferredAckHandledLaterWhenlastConsumedMessageIsAcked() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final int DEFAULT_PREFETCH = 10;
+
+            // Set to fixed known value to reduce breakage if defaults are changed.
+            Connection connection = testFixture.establishConnecton(testPeer, "jms.prefetchPolicy.all=" + DEFAULT_PREFETCH);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+            Queue queue = session.createQueue(getTestName());
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content-for-consumer-1"),
+                    1, false, false, Matchers.equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH)), 1, true);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content-for-consumer-2"),
+                    1, false, false, Matchers.equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH)), 2, true);
+
+            final CountDownLatch expected = new CountDownLatch(2);
+            ((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onInboundMessage(JmsInboundMessageDispatch envelope) {
+                    expected.countDown();
+                }
+            });
+
+            // These are our two consumers, the first gets a message and abandons it, the second will have
+            // acknowledge called on its message which will lead to the message for the first to be acknowledged
+            // and then it's link will be closed.
+            MessageConsumer consumer1 = session.createConsumer(queue);
+            MessageConsumer consumer2 = session.createConsumer(queue);
+            Message receivedMessage1 = null;
+            Message receivedMessage2 = null;
+
+            // Ensure all the messages arrived so that the matching below is deterministic
+            assertTrue("Expected transfers didnt occur: " + expected.getCount(), expected.await(5, TimeUnit.SECONDS));
+
+            // Take our two messages from the queue leaving them in a delivered state.
+            receivedMessage1 = consumer1.receive(3000);
+            assertNotNull(receivedMessage1);
+            assertTrue(receivedMessage1 instanceof TextMessage);
+            receivedMessage2 = consumer2.receive(3000);
+            assertNotNull(receivedMessage2);
+            assertTrue(receivedMessage2 instanceof TextMessage);
+
+            // Expect the client to then drain off all credit from the link when "closed"
+            testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH - 1)));
+            // Expect the client to then drain off all credit from the link when "closed"
+            testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(DEFAULT_PREFETCH - 1)));
+
+            // Close should be deferred as the messages were delivered but not acknowledged.
+            consumer1.close();
+            consumer2.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+
+            testPeer.expectDisposition(true, new AcceptedMatcher());
+            testPeer.expectDisposition(true, new AcceptedMatcher());
+
+            // Now the links should close as we tear down the deferred consumers
+            testPeer.expectDetach(true, true, true);
+            testPeer.expectDetach(true, true, true);
+
+            // Acknowledge the last read message, which should accept all previous messages as well
+            // and our consumers should then close their links in turn.
+            receivedMessage2.acknowledge();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testConsumerWithDeferredCloseAcksAsClosed() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             final int DEFAULT_PREFETCH = 100;