ARTEMIS-2156 Message Duplication when using exclusive divert and clustering
authorLuis De Bello <luis.debello@mulesoft.com>
Sun, 20 Jan 2019 04:09:36 +0000 (01:09 -0300)
committerClebert Suconic <clebertsuconic@apache.org>
Wed, 23 Jan 2019 15:17:03 +0000 (10:17 -0500)
artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ExclusiveDivertWithClusterTest.java [new file with mode: 0644]

index 3742e23..1b7b79f 100644 (file)
@@ -23,9 +23,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.Message;
@@ -64,7 +66,7 @@ public final class BindingsImpl implements Bindings {
     */
    private final Map<SimpleString, Binding> bindingsNameMap = new ConcurrentHashMap<>();
 
-   private final List<Binding> exclusiveBindings = new CopyOnWriteArrayList<>();
+   private final Set<Binding> exclusiveBindings = new CopyOnWriteArraySet<>();
 
    private volatile MessageLoadBalancingType messageLoadBalancingType = MessageLoadBalancingType.OFF;
 
index 9075ac6..73f42bb 100644 (file)
@@ -2197,6 +2197,25 @@ public abstract class ActiveMQTestBase extends Assert {
       return (int) queue.getMessageCount();
    }
 
+   /**
+    * @param postOffice
+    * @param address
+    * @return
+    * @throws Exception
+    */
+   protected int getMessagesAdded(final PostOffice postOffice, final String address) throws Exception {
+      int messageCount = 0;
+
+      List<QueueBinding> bindings = getLocalQueueBindings(postOffice, address);
+
+      for (QueueBinding qBinding : bindings) {
+         qBinding.getQueue().flushExecutor();
+         messageCount += getMessagesAdded(qBinding.getQueue());
+      }
+
+      return messageCount;
+   }
+
    protected int getMessagesAdded(final Queue queue) {
       queue.flushExecutor();
       return (int) queue.getMessagesAdded();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ExclusiveDivertWithClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ExclusiveDivertWithClusterTest.java
new file mode 100644 (file)
index 0000000..f033128
--- /dev/null
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.distribution;
+
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.DivertConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ExclusiveDivertWithClusterTest extends ClusterTestBase {
+
+   private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+
+      start();
+   }
+
+   @Override
+   protected ActiveMQServer createServer(final boolean realFiles,
+                                         final Configuration configuration,
+                                         final long pageSize,
+                                         final long maxAddressSize,
+                                         final Map<String, AddressSettings> settings) {
+      DivertConfiguration divertConf = new DivertConfiguration().setName("notifications-divert").setAddress("*.Provider.*.Agent.*.Status").setForwardingAddress("Notifications").setExclusive(true);
+
+      configuration.addDivertConfiguration(divertConf);
+
+      ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(configuration, realFiles));
+
+      if (settings != null) {
+         for (Map.Entry<String, AddressSettings> setting : settings.entrySet()) {
+            server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue());
+         }
+      }
+
+      AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(pageSize).setRedeliveryDelay(0).setMaxSizeBytes(maxAddressSize).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setRedistributionDelay(0).setAutoCreateQueues(true).setAutoCreateAddresses(true);
+
+      server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+      return server;
+   }
+
+   @Test
+   public void testExclusiveDivertDoesNotDuplicateMessageInCluster() throws Exception {
+      setupCluster(MessageLoadBalancingType.ON_DEMAND);
+
+      startServers(0, 1);
+
+      setupSessionFactory(0, isNetty(), false);
+      setupSessionFactory(1, isNetty(), false);
+
+      createQueue(0, "Notifications", "Notifications", null, false, RoutingType.ANYCAST);
+      createQueue(1, "Notifications", "Notifications", null, false, RoutingType.ANYCAST);
+
+      addConsumer(0, 0, "Notifications", null, true);
+
+      createQueue(0, "x.Provider.y.Agent.z.Status", "x.Provider.y.Agent.z.Status", null, false, RoutingType.ANYCAST);
+      createQueue(1, "x.Provider.y.Agent.z.Status", "x.Provider.y.Agent.z.Status", null, false, RoutingType.ANYCAST);
+
+      waitForBindings(0, "Notifications", 1, 1, true);
+      waitForBindings(0, "Notifications", 1, 0, false);
+
+      waitForBindings(1, "Notifications", 1, 0, true);
+      waitForBindings(1, "Notifications", 1, 1, false);
+
+      send(0, "x.Provider.y.Agent.z.Status", 1, false, null);
+
+      int messagesAdded = getMessagesAdded(servers[0].getPostOffice(), "Notifications");
+
+      assertEquals(1, messagesAdded);
+   }
+
+   protected void start() throws Exception {
+      setupServers();
+   }
+
+   protected void setupServers() throws Exception {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+   }
+
+   protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception {
+      setupClusterConnection("cluster0", "", messageLoadBalancingType, 1, isNetty(), 0, 1);
+      setupClusterConnection("cluster1", "", messageLoadBalancingType, 1, isNetty(), 1, 0);
+   }
+
+   protected void stopServers() throws Exception {
+      closeAllConsumers();
+
+      closeAllSessionFactories();
+
+      closeAllServerLocatorsFactories();
+
+      stopServers(0, 1);
+
+      clearServer(0, 1);
+   }
+
+   protected boolean isNetty() {
+      return false;
+   }
+}