QPID-8193: [Broker-J] Update queue entries expirations on change of queue attributes...
authorAlex Rudyy <orudyy@apache.org>
Thu, 6 Dec 2018 18:16:30 +0000 (18:16 +0000)
committerAlex Rudyy <orudyy@apache.org>
Thu, 6 Dec 2018 18:16:30 +0000 (18:16 +0000)
broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/endtoend/message/TimeToLiveTest.java [new file with mode: 0644]

index 00c1e30..ba98f1a 100644 (file)
@@ -236,9 +236,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
     private String _messageGroupDefaultGroup;
     @ManagedAttributeField
     private int _maximumDistinctGroups;
-    @ManagedAttributeField
+    @ManagedAttributeField(afterSet = "queueMessageTtlChanged")
     private long _minimumMessageTtl;
-    @ManagedAttributeField
+    @ManagedAttributeField(afterSet = "queueMessageTtlChanged")
     private long _maximumMessageTtl;
     @ManagedAttributeField
     private boolean _ensureNondestructiveConsumers;
@@ -3718,4 +3718,49 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
             _transactions.remove(localTransaction);
         }
     }
+
+    @SuppressWarnings("unused")
+    private void queueMessageTtlChanged()
+    {
+        if (getState() == State.ACTIVE)
+        {
+            String taskName = String.format("Queue Housekeeping : %s : TTL Update", getName());
+            getVirtualHost().executeTask(taskName,
+                                         this::updateQueueEntryExpiration,
+                                         getSystemTaskControllerContext(taskName, _virtualHost.getPrincipal()));
+        }
+    }
+
+    private void updateQueueEntryExpiration()
+    {
+        final QueueEntryList entries = getEntries();
+        if (entries != null)
+        {
+            final QueueEntryIterator queueListIterator = entries.iterator();
+            while (!_stopped.get() && queueListIterator.advance())
+            {
+                final QueueEntry node = queueListIterator.getNode();
+                if (!node.isDeleted())
+                {
+                    ServerMessage msg = node.getMessage();
+                    if (msg != null)
+                    {
+                        try (MessageReference messageReference = msg.newReference())
+                        {
+                            updateExpiration(node);
+                        }
+                        catch (MessageDeletedException e)
+                        {
+                            // Ignore
+                        }
+                    }
+                    if (node.expired())
+                    {
+                        expireEntry(node);
+                    }
+                }
+            }
+        }
+    }
+
 }
diff --git a/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/endtoend/message/TimeToLiveTest.java b/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/endtoend/message/TimeToLiveTest.java
new file mode 100644 (file)
index 0000000..0d07643
--- /dev/null
@@ -0,0 +1,118 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.http.endtoend.message;
+
+import static javax.servlet.http.HttpServletResponse.SC_OK;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+import java.util.Collections;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.qpid.tests.http.HttpRequestConfig;
+import org.apache.qpid.tests.http.HttpTestBase;
+
+@HttpRequestConfig
+public class TimeToLiveTest extends HttpTestBase
+{
+    private static final String QUEUE_NAME = "testQueue";
+    private static final long HOUSE_KEEPING_CHECK_PERIOD = 100;
+    private static final long ONE_DAY_MILLISECONDS = 24 * 60 * 60 * 1000L;
+
+    @Before
+    public void setUp()
+    {
+        getBrokerAdmin().createQueue(QUEUE_NAME);
+    }
+
+    @BeforeClass
+    public static void setUpClass() throws Exception
+    {
+        System.setProperty("virtualhost.housekeepingCheckPeriod", String.valueOf(HOUSE_KEEPING_CHECK_PERIOD));
+    }
+
+    @AfterClass
+    public static void tearDownClass()
+    {
+        System.clearProperty("virtualhost.housekeepingCheckPeriod");
+    }
+
+    @Test
+    public void queueTimeToLiveUpdateIsAppliedToEnqueuedMessages() throws Exception
+    {
+        Connection connection = getConnection();
+        try
+        {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
+            producer.send(session.createTextMessage("A"), DeliveryMode.NON_PERSISTENT, 4, ONE_DAY_MILLISECONDS);
+        }
+        finally
+        {
+            connection.close();
+        }
+
+        getHelper().submitRequest(String.format("queue/%s", QUEUE_NAME),
+                                  "POST",
+                                  Collections.singletonMap("maximumMessageTtl", 1),
+                                  SC_OK);
+
+        Thread.sleep(HOUSE_KEEPING_CHECK_PERIOD * 2);
+
+        getHelper().submitRequest(String.format("queue/%s", QUEUE_NAME),
+                                  "POST",
+                                  Collections.singletonMap("maximumMessageTtl", 0),
+                                  SC_OK);
+
+        Connection connection2 = getConnection();
+        try
+        {
+            connection2.start();
+            Session session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
+            MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
+            producer.send(session.createTextMessage("B") , DeliveryMode.NON_PERSISTENT, 4, 0);
+            Message message = consumer.receive(getReceiveTimeout());
+            assertThat(message, is(notNullValue()));
+            assertThat(message, is(instanceOf(TextMessage.class)));
+            assertThat(((TextMessage)message).getText(), is(equalTo("B")));
+        }
+        finally
+        {
+            connection2.close();
+        }
+    }
+}