ARTEMIS-2205 Fixing Divert Example
authorClebert Suconic <clebertsuconic@apache.org>
Wed, 30 Jan 2019 18:04:21 +0000 (13:04 -0500)
committerClebert Suconic <clebertsuconic@apache.org>
Wed, 30 Jan 2019 20:38:36 +0000 (15:38 -0500)
This is simply fixing the example under examples/features/standard/divert
Other tests are passing.
No additional tests are needed as the example on this case acts like a test.

artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java

index 1b7b79f..c3bf31a 100644 (file)
@@ -282,6 +282,12 @@ public final class BindingsImpl implements Bindings {
    private void route(final Message message,
                       final RoutingContext context,
                       final boolean groupRouting) throws Exception {
+      boolean reusableContext = context.isReusable(message, version.get());
+
+      if (!reusableContext) {
+         context.clear();
+      }
+
       /* This is a special treatment for scaled-down messages involving SnF queues.
        * See org.apache.activemq.artemis.core.server.impl.ScaleDownHandler.scaleDownMessages() for the logic that sends messages with this property
        */
@@ -310,6 +316,7 @@ public final class BindingsImpl implements Bindings {
             binding.getBindable().route(message, context);
             routed = true;
          }
+         context.setReusable(false);
       }
       if (!routed) {
          // Remove the ids now, in order to avoid double check
@@ -319,10 +326,10 @@ public final class BindingsImpl implements Bindings {
          SimpleString groupId = message.getGroupID();
 
          if (ids != null) {
-            context.clear();
+            context.clear().setReusable(false);
             routeFromCluster(message, context, ids);
          } else if (groupingHandler != null && groupRouting && groupId != null) {
-            context.clear();
+            context.clear().setReusable(false);
             routeUsingStrictOrdering(message, context, groupingHandler, groupId, 0);
          } else if (CompositeAddress.isFullyQualified(message.getAddress())) {
             Binding theBinding = bindingsNameMap.get(CompositeAddress.extractQueueName(message.getAddressSimpleString()));
@@ -331,9 +338,8 @@ public final class BindingsImpl implements Bindings {
             }
          } else {
             // in a optimization, we are reusing the previous context if everything is right for it
-            // so the simpleRouting will only happen if neededk
-            if (!context.isReusable(message, version.get())) {
-               context.clear();
+            // so the simpleRouting will only happen if needed
+            if (!reusableContext) {
                simpleRouting(message, context);
             }
          }
index 605e43e..c552ca9 100644 (file)
@@ -889,6 +889,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
 
       if (bindingMove != null) {
          context.clear();
+         context.setReusable(false);
          bindingMove.route(message, context);
          if (addressInfo != null) {
             addressInfo.incrementRoutedMessageCount();
@@ -899,6 +900,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
             addressInfo.incrementRoutedMessageCount();
          }
       } else {
+         context.setReusable(false);
          if (addressInfo != null) {
             addressInfo.incrementUnRoutedMessageCount();
          }
index 151aa41..bfde7af 100644 (file)
@@ -40,7 +40,7 @@ public interface RoutingContext {
 
    SimpleString getPreviousAddress();
 
-   void setReusable(boolean reusable);
+   RoutingContext setReusable(boolean reusable);
 
    RoutingContext setReusable(boolean reusable, int version);
 
@@ -60,7 +60,7 @@ public interface RoutingContext {
 
    int getQueueCount();
 
-   void clear();
+   RoutingContext clear();
 
    void addQueueWithAck(SimpleString address, Queue queue);
 
index e6aa210..dce20e7 100644 (file)
@@ -90,6 +90,8 @@ public class DivertImpl implements Divert {
          logger.trace("Diverting message " + message + " into " + this);
       }
 
+      context.setReusable(false);
+
       Message copy = null;
 
       // Shouldn't copy if it's not routed anywhere else
@@ -127,7 +129,7 @@ public class DivertImpl implements Divert {
          copy = message;
       }
 
-      postOffice.route(copy, context.getTransaction(), false);
+      postOffice.route(copy, new RoutingContextImpl(context.getTransaction()).setReusable(false), false);
    }
 
    @Override
index b5b36bb..e9df830 100644 (file)
@@ -79,8 +79,14 @@ public final class RoutingContextImpl implements RoutingContext {
    }
 
    @Override
-   public void setReusable(boolean reusable) {
+   public RoutingContext setReusable(boolean reusable) {
+      if (this.reusable != null && !this.reusable.booleanValue()) {
+         // cannot set to Reusable once it was set to false
+         return this;
+      }
+
       this.reusable = reusable;
+      return this;
    }
    @Override
    public RoutingContext setReusable(boolean reusable, int previousBindings) {
@@ -96,7 +102,7 @@ public final class RoutingContextImpl implements RoutingContext {
    }
 
    @Override
-   public void clear() {
+   public RoutingContext clear() {
       map.clear();
 
       queueCount = 0;
@@ -104,6 +110,8 @@ public final class RoutingContextImpl implements RoutingContext {
       this.version = 0;
 
       this.reusable = null;
+
+      return this;
    }
 
    @Override
@@ -147,7 +155,10 @@ public final class RoutingContextImpl implements RoutingContext {
 
    @Override
    public boolean isReusable(Message message, int version) {
-      return isReusable() && queueCount > 0 && address.equals(previousAddress) && previousRoutingType == routingType && getPreviousBindingsVersion() == version;
+      if (getPreviousBindingsVersion() != version) {
+         this.reusable = false;
+      }
+      return isReusable() && queueCount > 0 && address.equals(previousAddress) && previousRoutingType == routingType;
    }
 
    @Override