AMQ-7085 - Properly start TempUsage inside Queue start
authorChristopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Thu, 17 Jan 2019 12:58:36 +0000 (07:58 -0500)
committerChristopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Thu, 17 Jan 2019 12:58:36 +0000 (07:58 -0500)
This fix allows temp usage changes to the broker temp usage to propagate
properly to a Queue's temp usage settings

Patch applied with thanks to David Sitsky

activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq-broker/src/main/java/org/apache/activemq/broker/view/BrokerDestinationView.java
activemq-broker/src/test/java/org/apache/activemq/bugs/AMQ7085Test.java [new file with mode: 0644]

index 3055b57..d795c4d 100644 (file)
@@ -158,6 +158,21 @@ public class DestinationView implements DestinationViewMBean {
     }
 
     @Override
+    public int getTempUsagePercentUsage() {
+        return destination.getTempUsage().getPercentUsage();
+    }
+
+    @Override
+    public long getTempUsageLimit() {
+        return destination.getTempUsage().getLimit();
+    }
+
+    @Override
+    public void setTempUsageLimit(long limit) {
+        destination.getTempUsage().setLimit(limit);
+    }
+
+    @Override
     public long getMaxEnqueueTime() {
         return destination.getDestinationStatistics().getProcessTime().getMaxTime();
     }
index 7e09948..ad0ae32 100644 (file)
@@ -239,6 +239,24 @@ public interface DestinationViewMBean {
     void setMemoryLimit(long limit);
 
     /**
+     * @return the percentage of amount of temp usage used
+     */
+    @MBeanInfo("The percentage of the temp usage limit used")
+    int getTempUsagePercentUsage();
+
+    /**
+     * @return the amount of temp usage allocated to this destination
+     */
+    @MBeanInfo("Temp usage limit, in bytes, assigned to this destination.")
+    long getTempUsageLimit();
+
+    /**
+     * set the amount of temp usage allocated to this destination
+     * @param limit the amount of temp usage allocated to this destination
+     */
+    void setTempUsageLimit(long limit);
+
+    /**
      * @return the portion of memory from the broker memory limit for this destination
      */
     @MBeanInfo("Portion of memory from the broker memory limit for this destination")
index 95c7e15..805ef6f 100644 (file)
@@ -42,6 +42,7 @@ import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.usage.TempUsage;
 import org.apache.activemq.usage.Usage;
 import org.slf4j.Logger;
 
@@ -279,6 +280,11 @@ public abstract class BaseDestination implements Destination {
     }
 
     @Override
+    public TempUsage getTempUsage() {
+        return systemUsage.getTempUsage();
+    }
+
+    @Override
     public DestinationStatistics getDestinationStatistics() {
         return destinationStatistics;
     }
index 81e7fa1..031015e 100644 (file)
@@ -33,6 +33,7 @@ import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.TempUsage;
 import org.apache.activemq.usage.Usage;
 
 /**
@@ -70,6 +71,8 @@ public interface Destination extends Service, Task, Message.MessageDestination {
 
     void setMemoryUsage(MemoryUsage memoryUsage);
 
+    TempUsage getTempUsage();
+
     void dispose(ConnectionContext context) throws IOException;
 
     boolean isDisposed();
index 1897c23..0d0db05 100644 (file)
@@ -32,6 +32,7 @@ import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.TempUsage;
 import org.apache.activemq.usage.Usage;
 import org.apache.activemq.util.SubscriptionKey;
 
@@ -123,6 +124,11 @@ public class DestinationFilter implements Destination {
     }
 
     @Override
+    public TempUsage getTempUsage() {
+        return next.getTempUsage();
+    }
+
+    @Override
     public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
         next.removeSubscription(context, sub, lastDeliveredSequenceId);
     }
index a8463d3..e8ef717 100644 (file)
@@ -1030,6 +1030,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
             if (systemUsage.getStoreUsage() != null) {
                 systemUsage.getStoreUsage().start();
             }
+            if (systemUsage.getTempUsage() != null) {
+                systemUsage.getTempUsage().start();
+            }
             systemUsage.getMemoryUsage().addUsageListener(this);
             messages.start();
             if (getExpireMessagesPeriod() > 0) {
index f009d62..f901cc2 100644 (file)
@@ -120,6 +120,23 @@ public class BrokerDestinationView {
         return destination.getMemoryUsage().getLimit();
     }
 
+    /**
+     * Gets the temp usage as a percentage for this Destination.
+     *
+     * @return Gets the temp usage as a percentage for this Destination.
+     */
+    public int getTempPercentUsage() {
+        return destination.getTempUsage().getPercentUsage();
+    }
+
+    /**
+     * Gets the temp usage limit in bytes.
+     *
+     * @return the temp usage limit in bytes.
+     */
+    public long getTempUsageLimit() {
+        return destination.getTempUsage().getLimit();
+    }
 
     /**
      * @return the average time it takes to store a message on this destination (ms)
diff --git a/activemq-broker/src/test/java/org/apache/activemq/bugs/AMQ7085Test.java b/activemq-broker/src/test/java/org/apache/activemq/bugs/AMQ7085Test.java
new file mode 100644 (file)
index 0000000..5848ef5
--- /dev/null
@@ -0,0 +1,100 @@
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerView;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.*;
+import javax.management.ObjectName;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests to ensure when the temp usage limit is updated on the broker the queues also have their
+ * temp usage limits automatically updated.
+ */
+public class AMQ7085Test
+{
+    private BrokerService brokerService;
+    private String testQueueName = "testAMQ7085Queue";
+    private ActiveMQQueue queue = new ActiveMQQueue(testQueueName);
+
+    @Before
+    public void setUp() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(true);
+        String connectionUri = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString();
+        brokerService.start();
+        brokerService.waitUntilStarted();
+
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+        final Connection conn = connectionFactory.createConnection();
+        try {
+            conn.start();
+            final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            final Destination queue = session.createQueue(testQueueName);
+            final Message toSend = session.createMessage();
+            toSend.setStringProperty("foo", "bar");
+            final MessageProducer producer = session.createProducer(queue);
+            producer.send(queue, toSend);
+        } finally {
+            conn.close();
+        }
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+    }
+
+    @Test
+    public void testQueueTempUsageWhenSetExplicitly() throws Exception {
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queue.getQueueName());
+        QueueViewMBean queueViewMBean = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(
+                queueViewMBeanName, QueueViewMBean.class, true);
+
+        // Check that by default the queue's temp limit is the same as the broker's.
+        BrokerView brokerView = brokerService.getAdminView();
+        long brokerTempLimit = brokerView.getTempLimit();
+        assertEquals(brokerTempLimit, queueViewMBean.getTempUsageLimit());
+
+        // Change the queue's temp limit independently of the broker's setting and check the broker's limit does not
+        // change.
+        long queueTempLimit = brokerTempLimit + 111;
+        queueViewMBean.setTempUsageLimit(queueTempLimit);
+        assertEquals(queueViewMBean.getTempUsageLimit(), queueTempLimit);
+        assertEquals(brokerView.getTempLimit(), brokerTempLimit);
+
+        // Now increase the broker's temp limit.  Since the queue's limit was explicitly changed it should remain
+        // unchanged.
+        long newBrokerTempLimit = brokerTempLimit + 555;
+        brokerView.setTempLimit(newBrokerTempLimit);
+        assertEquals(brokerView.getTempLimit(), newBrokerTempLimit);
+        assertEquals(queueViewMBean.getTempUsageLimit(), queueTempLimit);
+    }
+
+    @Test
+    public void testQueueTempUsageWhenBrokerTempUsageUpdated() throws Exception {
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queue.getQueueName());
+        QueueViewMBean queueViewMBean = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(
+                queueViewMBeanName, QueueViewMBean.class, true);
+
+        // Check that by default the queue's temp limit is the same as the broker's.
+        BrokerView brokerView = brokerService.getAdminView();
+        long brokerTempLimit = brokerView.getTempLimit();
+        assertEquals(brokerTempLimit, queueViewMBean.getTempUsageLimit());
+
+        // Increase the broker's temp limit and check the queue's limit is updated to the same value.
+        long newBrokerTempLimit = brokerTempLimit + 555;
+        brokerView.setTempLimit(newBrokerTempLimit);
+        assertEquals(brokerView.getTempLimit(), newBrokerTempLimit);
+        assertEquals(queueViewMBean.getTempUsageLimit(), newBrokerTempLimit);
+    }
+}