AMQ7135 : do a purge before deleting the destination
authorhkesler <hkesler@contractor.cengage.com>
Sun, 13 Jan 2019 03:47:16 +0000 (22:47 -0500)
committerjgoodyear <jgoodyear@apache.org>
Mon, 14 Jan 2019 17:14:52 +0000 (13:44 -0330)
(cherry picked from commit 9f513f88781667df1a2a4b0a85153059dc097295)

activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
activemq-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java

index a18e793..8dcb76c 100644 (file)
@@ -297,6 +297,9 @@ public abstract class AbstractRegion implements Region {
                     }
                 }
                 destinationMap.unsynchronizedRemove(destination, dest);
+                if (dest instanceof Queue){
+                    ((Queue) dest).purge();
+                }
                 dispose(context, dest);
                 DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
                 if (destinationInterceptor != null) {
index 632294c..80d202e 100644 (file)
@@ -26,6 +26,7 @@ import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
@@ -35,6 +36,7 @@ import org.apache.activemq.advisory.DestinationSource;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.DestinationViewMBean;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.util.Wait;
 import org.junit.After;
@@ -75,6 +77,45 @@ public class RemoveDestinationTest {
     }
 
     @Test(timeout = 60000)
+    public void testRemoveQueue() throws Exception {
+
+        ActiveMQConnection amqConnection = (ActiveMQConnection) createConnection(true);
+
+        final DestinationSource destinationSource = amqConnection.getDestinationSource();
+        Session session = amqConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("TEST.FOO");
+        MessageProducer producer = session.createProducer(queue);
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        TextMessage msg = session.createTextMessage("Hellow World");
+        producer.send(msg);
+        assertNotNull(consumer.receive(5000));
+        final ActiveMQQueue amqQueue = (ActiveMQQueue) queue;
+
+        consumer.close();
+        producer.close();
+        session.close();
+
+        assertTrue("Destination discovered", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return destinationSource.getQueues().contains(amqQueue);
+            }
+        }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100)));
+
+        amqConnection.destroyDestination((ActiveMQDestination) queue);
+
+        assertTrue("Destination is removed", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return !destinationSource.getQueues().contains(amqQueue);
+            }
+        }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100)));
+    }
+
+    @Test(timeout = 60000)
     public void testRemoveDestinationWithoutSubscriber() throws Exception {
 
         ActiveMQConnection amqConnection = (ActiveMQConnection) createConnection(true);