ARTEMIS-196 Implement Consumer Priority
authorMichael André Pearce <michael.andre.pearce@me.com>
Thu, 10 Jan 2019 20:09:02 +0000 (20:09 +0000)
committerClebert Suconic <clebertsuconic@apache.org>
Wed, 23 Jan 2019 16:19:24 +0000 (11:19 -0500)
Add consumer priority support
Includes refactor of consumer iterating in QueueImpl to its own logical class, to be able to implement.
Add OpenWire JMS Test - taken from ActiveMQ5
Add Core JMS Test
Add AMQP Test
Add Docs

49 files changed:
artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java
artemis-commons/src/main/java/org/apache/activemq/artemis/core/PriorityAware.java [new file with mode: 0644]
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ArrayResettableIterator.java [new file with mode: 0644]
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java [new file with mode: 0644]
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIteratorBase.java [new file with mode: 0644]
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiResettableIterator.java [new file with mode: 0644]
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java [new file with mode: 0644]
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/RepeatableIterator.java [new file with mode: 0644]
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/RepeatableIteratorWrapper.java [new file with mode: 0644]
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ResettableIterator.java [new file with mode: 0644]
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/SingletonIterator.java [new file with mode: 0644]
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/UpdatableIterator.java [new file with mode: 0644]
artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/MultiIteratorTest.java [new file with mode: 0644]
artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/MultiResettableIteratorTest.java [new file with mode: 0644]
artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/PriorityCollectionTest.java [new file with mode: 0644]
artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/UpdatableIteratorTest.java [new file with mode: 0644]
artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java
artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java [new file with mode: 0644]
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java [new file with mode: 0644]
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImplTest.java [new file with mode: 0644]
docs/user-manual/en/SUMMARY.md
docs/user-manual/en/consumer-priority.md [new file with mode: 0644]
tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java [new file with mode: 0644]
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerPriorityTest.java [new file with mode: 0644]
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/QueueConsumerPriorityTest.java [new file with mode: 0644]
tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java
tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java

index 12c5f86..0f7eb3c 100644 (file)
@@ -32,6 +32,7 @@ public class QueueAttributes implements Serializable {
    public static final String PURGE_ON_NO_CONSUMERS = "purge-on-no-consumers";
    public static final String CONSUMERS_BEFORE_DISPATCH = "consumers-before-dispatch";
    public static final String DELAY_BEFORE_DISPATCH = "delay-before-dispatch";
+   public static final String CONSUMER_PRIORITY = "consumer-priority";
 
    private RoutingType routingType;
    private SimpleString filterString;
@@ -44,6 +45,7 @@ public class QueueAttributes implements Serializable {
    private Boolean purgeOnNoConsumers;
    private Integer consumersBeforeDispatch;
    private Long delayBeforeDispatch;
+   private Integer consumerPriority;
 
    public void set(String key, String value) {
       if (key != null && value != null) {
@@ -69,6 +71,8 @@ public class QueueAttributes implements Serializable {
             setConsumersBeforeDispatch(Integer.valueOf(value));
          } else if (key.equals(DELAY_BEFORE_DISPATCH)) {
             setDelayBeforeDispatch(Long.valueOf(value));
+         } else if (key.equals(CONSUMER_PRIORITY)) {
+            setConsumerPriority(Integer.valueOf(value));
          }
       }
    }
@@ -172,4 +176,13 @@ public class QueueAttributes implements Serializable {
       return this;
    }
 
+   public Integer getConsumerPriority() {
+      return consumerPriority;
+   }
+
+   public QueueAttributes setConsumerPriority(Integer consumerPriority) {
+      this.consumerPriority = consumerPriority;
+      return this;
+   }
+
 }
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/PriorityAware.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/PriorityAware.java
new file mode 100644 (file)
index 0000000..e90015a
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core;
+
+public interface PriorityAware {
+
+   int getPriority();
+}
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ArrayResettableIterator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ArrayResettableIterator.java
new file mode 100644 (file)
index 0000000..c0e248d
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Collection;
+
+/**
+ * Provides an Array Iterator that is able to reset, allowing you to iterate over the full array.
+ * It achieves this though by moving end position mark to the the current cursors position,
+ * so it round robins, even with reset.
+ * @param <T>
+ */
+public class ArrayResettableIterator<T> implements ResettableIterator<T> {
+
+   private final Object[] array;
+   private int cursor = 0;
+   private int endPos = -1;
+   private boolean hasNext;
+
+   public ArrayResettableIterator(Object[] array) {
+      this.array = array;
+      reset();
+   }
+
+   public static <T> ResettableIterator<T> iterator(Collection<T> collection) {
+      return new ArrayResettableIterator<>(collection.toArray());
+   }
+
+   @Override
+   public void reset() {
+      endPos = cursor;
+      hasNext = array.length > 0;
+   }
+
+   @Override
+   public boolean hasNext() {
+      return hasNext;
+   }
+
+   @Override
+   public T next() {
+      if (!hasNext) {
+         throw new IllegalStateException();
+      }
+      @SuppressWarnings("unchecked") T result = (T) array[cursor];
+      cursor++;
+      if (cursor == array.length) {
+         cursor = 0;
+      }
+      if (cursor == endPos) {
+         hasNext = false;
+      }
+      return result;
+   }
+}
\ No newline at end of file
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java
new file mode 100644 (file)
index 0000000..738420c
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Iterator;
+
+/**
+ * Provides an Iterator that works over multiple underlying iterators.
+ *
+ * @param <T> type of the class of the iterator.
+ */
+public class MultiIterator<T> extends MultiIteratorBase<T, Iterator<T>> {
+
+   public MultiIterator(Iterator<T>[] iterators) {
+      super(iterators);
+   }
+}
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIteratorBase.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIteratorBase.java
new file mode 100644 (file)
index 0000000..ab866a1
--- /dev/null
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Iterator;
+
+/**
+ * Provides an Abstract Iterator that works over multiple underlying iterators.
+ *
+ * @param <T> type of the class of the iterator.
+ * @param <I> type of the iterator
+ */
+abstract class MultiIteratorBase<T, I extends Iterator<T>> implements Iterator<T> {
+
+   private final I[] iterators;
+   private int index = -1;
+
+   MultiIteratorBase(I[] iterators) {
+      this.iterators = iterators;
+   }
+
+   @Override
+   public boolean hasNext() {
+      while (true) {
+         if (index != -1) {
+            I currentIterator = get(index);
+            if (currentIterator.hasNext()) {
+               return true;
+            }
+         }
+         int next = index + 1;
+         if (next < iterators.length) {
+            moveTo(next);
+         } else {
+            return false;
+         }
+      }
+   }
+
+   @Override
+   public T next() {
+      while (true) {
+         if (index != -1) {
+            I currentIterator = get(index);
+            if (currentIterator.hasNext()) {
+               return currentIterator.next();
+            }
+         }
+         int next = index + 1;
+         if (next < iterators.length) {
+            moveTo(next);
+         } else {
+            return null;
+         }
+      }
+   }
+
+   protected void moveTo(int index) {
+      this.index = index;
+   }
+
+   protected I get(int index) {
+      return iterators[index];
+   }
+}
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiResettableIterator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiResettableIterator.java
new file mode 100644 (file)
index 0000000..dcea7da
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+/**
+ * Extends MultiIterator, adding the ability if the underlying iterators are resettable, then its self can reset.
+ * It achieves this by going back to the first iterator, and as moves to another iterator it resets it.
+ *
+ * @param <T> type of the class of the iterator.
+ */
+public class MultiResettableIterator<T> extends MultiIteratorBase<T, ResettableIterator<T>> implements ResettableIterator<T> {
+
+   public MultiResettableIterator(ResettableIterator<T>[] iterators) {
+      super(iterators);
+   }
+
+   @Override
+   protected void moveTo(int index) {
+      super.moveTo(index);
+      if (index > -1) {
+         get(index).reset();
+      }
+   }
+
+   @Override
+   public void reset() {
+      moveTo(-1);
+   }
+}
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java
new file mode 100644 (file)
index 0000000..fb96f0b
--- /dev/null
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the the different collections used for each priority level.
+ *
+ * A supplier is required to provide the underlying collection needed when a new priority level is seen,
+ * and the end behaviour is that of the underlying collection, e.g. if set add will follow set's add semantics,
+ * if list, then list semantics.
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these.
+ *
+ * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware.
+ */
+public class PriorityCollection<T extends PriorityAware> extends AbstractCollection<T> {
+
+   private final Supplier<Collection<T>> supplier;
+   private volatile PriorityHolder<T>[] priorityHolders = newPrioritySetArrayInstance(0);
+   private volatile int size;
+
+   private void setArray(PriorityHolder<T>[] priorityHolders) {
+      this.priorityHolders = priorityHolders;
+   }
+
+   private PriorityHolder<T>[] getArray() {
+      return priorityHolders;
+   }
+
+
+   public PriorityCollection(Supplier<Collection<T>> supplier) {
+      this.supplier = supplier;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static <T> PriorityHolder<T>[] newPrioritySetArrayInstance(int length) {
+      return (PriorityHolder<T>[]) Array.newInstance(PriorityHolder.class, length);
+   }
+
+   @Override
+   public int size() {
+      return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+      return size() == 0;
+   }
+
+   public Set<Integer> getPriorites() {
+      PriorityHolder<T>[] snapshot = getArray();
+      return Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator<T> iterator() {
+      Iterator<T>[] iterators = getIterators();
+      return new MultiIterator<>(iterators);
+   }
+
+   private Iterator<T>[] getIterators() {
+      PriorityHolder<T>[] snapshot = this.getArray();
+      int size = snapshot.length;
+      Iterator<T>[] iterators = newIteratorArrayInstance(size);
+      for (int i = 0; i < size; i++) {
+         iterators[i] = snapshot[i].getValues().iterator();
+      }
+      return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static <T> Iterator<T>[] newIteratorArrayInstance(int length) {
+      return (Iterator<T>[]) Array.newInstance(Iterator.class, length);
+   }
+
+   public ResettableIterator<T> resettableIterator() {
+      return new MultiResettableIterator<T>(getResettableIterators());
+   }
+
+   private ResettableIterator<T>[] getResettableIterators() {
+      PriorityHolder<T>[] snapshot = this.getArray();
+      int size = snapshot.length;
+      ResettableIterator<T>[] iterators = newResettableIteratorArrayInstance(size);
+      for (int i = 0; i < size; i++) {
+         iterators[i] = ArrayResettableIterator.iterator(snapshot[i].getValues());
+      }
+      return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static <T> ResettableIterator<T>[] newResettableIteratorArrayInstance(int length) {
+      return (ResettableIterator<T>[]) Array.newInstance(ResettableIterator.class, length);
+   }
+
+   @Override
+   public void forEach(Consumer<? super T> action) {
+      Objects.requireNonNull(action);
+      PriorityHolder<T>[] current = getArray();
+      int len = current.length;
+      for (int i = 0; i < len; ++i) {
+         current[i].getValues().forEach(action);
+      }
+   }
+
+   private Collection<T> getCollection(int priority, boolean createIfMissing) {
+      PriorityHolder<T>[] current = getArray();
+      int low = 0;
+      int high = current.length - 1;
+
+      while (low <= high) {
+         int mid = (low + high) / 2;
+         PriorityHolder<T> midVal = current[mid];
+
+         if (midVal.getPriority() > priority)
+            low = mid + 1;
+         else if (midVal.getPriority() < priority)
+            high = mid - 1;
+         else
+            return midVal.getValues(); //key found
+      }
+
+      if (createIfMissing) {
+         PriorityHolder<T>[] newArray = newPrioritySetArrayInstance(current.length + 1);
+         if (low > 0) {
+            System.arraycopy(current, 0, newArray, 0, low);
+         }
+         if (current.length - low > 0) {
+            System.arraycopy(current, low, newArray, low + 1, current.length - low);
+         }
+         newArray[low] = new PriorityHolder<T>(priority, supplier);
+         setArray(newArray);
+         return newArray[low].getValues();
+      }
+      return null;
+   }
+
+   @Override
+   public synchronized boolean add(T t) {
+      if (size() == Integer.MAX_VALUE) return false;
+      boolean result = addInternal(t);
+      calcSize();
+      return result;
+   }
+
+   private boolean addInternal(T t) {
+      if (t == null) return false;
+      Collection<T> priority = getCollection(t.getPriority(), true);
+      return priority.add(t);
+   }
+
+   @Override
+   public synchronized boolean remove(Object o) {
+      boolean result = removeInternal(o);
+      calcSize();
+      return result;
+   }
+
+   private boolean removeInternal(Object o) {
+      if (o instanceof PriorityAware) {
+         PriorityAware priorityAware = (PriorityAware) o;
+         Collection<T> priority = getCollection(priorityAware.getPriority(), false);
+         boolean result = priority != null && priority.remove(priorityAware);
+         if (priority != null && priority.isEmpty()) {
+            removeCollection(priorityAware.getPriority());
+         }
+         return result;
+      } else {
+         return false;
+      }
+   }
+
+   private Collection<T> removeCollection(int priority) {
+      PriorityHolder<T>[] current = getArray();
+      int len = current.length;
+      int low = 0;
+      int high = len - 1;
+
+      while (low <= high) {
+         int mid = (low + high) / 2;
+         PriorityHolder<T> midVal = current[mid];
+
+         if (midVal.getPriority() > priority)
+            low = mid + 1;
+         else if (midVal.getPriority() < priority)
+            high = mid - 1;
+         else {
+            PriorityHolder<T>[] newArray = newPrioritySetArrayInstance(len - 1);
+            System.arraycopy(current, 0, newArray, 0, mid);
+            System.arraycopy(current, mid + 1, newArray, mid, len - mid - 1);
+            setArray(newArray);
+            return midVal.getValues(); //key found
+         }
+      }
+      return null;
+   }
+
+   @Override
+   public boolean containsAll(Collection<?> c) {
+      Objects.requireNonNull(c);
+      for (Object e : c)
+         if (!contains(e))
+            return false;
+      return true;
+   }
+
+   @Override
+   public synchronized boolean addAll(Collection<? extends T> c) {
+      Objects.requireNonNull(c);
+      if (size() >= Integer.MAX_VALUE - c.size()) return false;
+      boolean modified = false;
+      for (T e : c)
+         if (addInternal(e))
+            modified = true;
+      calcSize();
+      return modified;
+   }
+
+   @Override
+   public synchronized boolean removeAll(Collection<?> c) {
+      Objects.requireNonNull(c);
+      boolean modified = false;
+      for (Object o : c) {
+         if (removeInternal(o)) {
+            modified = true;
+         }
+      }
+      calcSize();
+      return modified;
+   }
+
+   @Override
+   public synchronized boolean retainAll(Collection<?> c) {
+      Objects.requireNonNull(c);
+      boolean modified = false;
+      PriorityHolder<T>[] snapshot = getArray();
+      for (PriorityHolder<T> priorityHolder : snapshot) {
+         if (priorityHolder.getValues().retainAll(c)) {
+            modified = true;
+            if (priorityHolder.getValues().isEmpty()) {
+               removeCollection(priorityHolder.getPriority());
+            }
+         }
+      }
+      calcSize();
+      return modified;
+   }
+
+   @Override
+   public synchronized void clear() {
+      PriorityHolder<T>[] snapshot = getArray();
+      for (PriorityHolder<T> priorityHolder : snapshot) {
+         priorityHolder.getValues().clear();
+      }
+      calcSize();
+   }
+
+   @Override
+   public boolean contains(Object o) {
+      return o instanceof PriorityAware && contains((PriorityAware) o);
+   }
+
+   public boolean contains(PriorityAware priorityAware) {
+      if (priorityAware == null) return false;
+      Collection<T> prioritySet = getCollection(priorityAware.getPriority(), false);
+      return prioritySet != null && prioritySet.contains(priorityAware);
+   }
+
+   private void calcSize() {
+      PriorityHolder<T>[] current = getArray();
+      int size = 0;
+      for (PriorityHolder<T> priorityHolder : current) {
+         size += priorityHolder.getValues().size();
+      }
+      this.size = size;
+   }
+
+   public static class PriorityHolder<E> implements PriorityAware {
+
+      private final int priority;
+
+      private final Collection<E> values;
+
+      public PriorityHolder(int priority, Supplier<Collection<E>> supplier) {
+         this.priority = priority;
+         this.values = supplier.get();
+      }
+
+      @Override
+      public int getPriority() {
+         return priority;
+      }
+
+      public Collection<E> getValues() {
+         return values;
+      }
+   }
+}
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/RepeatableIterator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/RepeatableIterator.java
new file mode 100644 (file)
index 0000000..cb41eea
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Iterator;
+
+public interface RepeatableIterator<E> extends Iterator<E> {
+
+   /**
+    * If the current value should repeat.
+    */
+   void repeat();
+}
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/RepeatableIteratorWrapper.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/RepeatableIteratorWrapper.java
new file mode 100644 (file)
index 0000000..2c23cba
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.function.Consumer;
+
+public class RepeatableIteratorWrapper<E> implements RepeatableIterator<E>, ResettableIterator<E> {
+
+   private ResettableIterator<E> iterator;
+   private E last;
+   private boolean repeat;
+
+   public RepeatableIteratorWrapper(ResettableIterator<E> iterator) {
+      this.iterator = iterator;
+   }
+
+   @Override
+   public void repeat() {
+      if (last != null) {
+         repeat = true;
+      }
+   }
+
+   @Override
+   public boolean hasNext() {
+      return repeat || iterator.hasNext();
+   }
+
+   @Override
+   public E next() {
+      if (repeat) {
+         repeat = false;
+         return last;
+      }
+      return last = iterator.next();
+   }
+
+   @Override
+   public void remove() {
+      iterator.remove();
+   }
+
+   @Override
+   public void forEachRemaining(Consumer<? super E> action) {
+      iterator.forEachRemaining(action);
+   }
+
+   @Override
+   public void reset() {
+      iterator.reset();
+   }
+}
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ResettableIterator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ResettableIterator.java
new file mode 100644 (file)
index 0000000..f807eb1
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Iterator;
+
+public interface ResettableIterator<E> extends Iterator<E> {
+
+   /**
+    * Resets the iterator so you can re-iterate over all elements.
+    */
+   void reset();
+}
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/SingletonIterator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/SingletonIterator.java
new file mode 100644 (file)
index 0000000..956e045
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.Consumer;
+
+public class SingletonIterator<E> implements Iterator<E> {
+
+   private E value;
+
+   public static <E> Iterator<E> newInstance(E e) {
+      return new SingletonIterator<>(e);
+   }
+
+   private SingletonIterator(E value) {
+      this.value = value;
+   }
+
+   @Override
+   public boolean hasNext() {
+      return value != null;
+   }
+
+   @Override
+   public E next() {
+      if (value != null) {
+         E result = value;
+         value = null;
+         return result;
+      } else {
+         throw new NoSuchElementException();
+      }
+   }
+
+   @Override
+   public void remove() {
+      value = null;
+   }
+
+   @Override
+   public void forEachRemaining(Consumer<? super E> action) {
+      if (value != null)
+         action.accept(value);
+   }
+}
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/UpdatableIterator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/UpdatableIterator.java
new file mode 100644 (file)
index 0000000..3ed82b4
--- /dev/null
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+public class UpdatableIterator<E> implements ResettableIterator<E>, RepeatableIterator<E> {
+
+   private final AtomicReferenceFieldUpdater<UpdatableIterator, RepeatableIteratorWrapper> changedIteratorFieldUpdater =  AtomicReferenceFieldUpdater.newUpdater(UpdatableIterator.class, RepeatableIteratorWrapper.class, "changedIterator");
+   private volatile RepeatableIteratorWrapper<E> changedIterator;
+   private RepeatableIteratorWrapper<E> currentIterator;
+
+   public UpdatableIterator(ResettableIterator<E> iterator) {
+      this.currentIterator = new RepeatableIteratorWrapper<>(iterator);
+   }
+
+   /**
+    * This can be called by another thread.
+    * It sets a new iterator, that will be picked up on the next reset.
+    *
+    * @param iterator the new iterator to update to.
+    */
+   public void update(ResettableIterator<E> iterator) {
+      changedIteratorFieldUpdater.set(this, new RepeatableIteratorWrapper<>(iterator));
+   }
+
+   /*
+    * ---- ResettableIterator Methods -----
+    * All the below ResettableIterator (including reset) methods MUST be called by the same thread,
+    * this is as any other use of Iterator.
+    */
+
+   /**
+    * When reset is called, then if a new iterator has been provided by another thread via update method,
+    * then we switch over to using the new iterator.
+    *
+    * It is important that on nulling off the changedIterator, we atomically compare and set as the
+    * changedIterator could be further updated by another thread whilst we are resetting,
+    * the subsequent update simply would be picked up on the next reset.
+    *
+    * @return this (itself).
+    */
+   @Override
+   public void reset() {
+      RepeatableIteratorWrapper<E> changedIterator = this.changedIterator;
+      if (changedIterator != null) {
+         currentIterator = changedIterator;
+         changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null);
+      }
+      currentIterator.reset();
+   }
+
+   @Override
+   public boolean hasNext() {
+      return currentIterator.hasNext();
+   }
+
+   @Override
+   public E next() {
+      return currentIterator.next();
+   }
+
+   @Override
+   public void remove() {
+      currentIterator.remove();
+   }
+
+   @Override
+   public void forEachRemaining(Consumer<? super E> action) {
+      currentIterator.forEachRemaining(action);
+   }
+
+   @Override
+   public void repeat() {
+      currentIterator.repeat();
+   }
+}
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/MultiIteratorTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/MultiIteratorTest.java
new file mode 100644 (file)
index 0000000..9947a5e
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class MultiIteratorTest {
+
+   @Test
+   public void testSingleIterator() {
+
+      ArrayList<Integer> arrayList = new ArrayList<>();
+      for (int i = 0; i < 1000; i++) {
+         arrayList.add(i);
+      }
+
+      MultiIterator<Integer> iterator = new MultiIterator<>(new Iterator[]{arrayList.iterator()});
+      for (int i = 0; i < 1000; i++) {
+         assertTrue(iterator.hasNext());
+         assertEquals(Integer.valueOf(i), iterator.next());
+      }
+      assertFalse(iterator.hasNext());
+
+   }
+
+   @Test
+   public void testMutlipleIterators() {
+
+      ArrayList<Integer> arrayList = new ArrayList<>();
+      for (int i = 0; i < 1000; i++) {
+         arrayList.add(i);
+      }
+      ArrayList<Integer> arrayList2 = new ArrayList<>();
+      for (int i = 1000; i < 2000; i++) {
+         arrayList2.add(i);
+      }
+      ArrayList<Integer> arrayList3 = new ArrayList<>();
+      for (int i = 2000; i < 3000; i++) {
+         arrayList3.add(i);
+      }
+
+      MultiIterator<Integer> iterator = new MultiIterator<>(new Iterator[]{arrayList.iterator(), arrayList2.iterator(), arrayList3.iterator()});
+      for (int i = 0; i < 3000; i++) {
+         assertTrue(iterator.hasNext());
+         assertEquals(Integer.valueOf(i), iterator.next());
+      }
+      assertFalse(iterator.hasNext());
+
+   }
+}
\ No newline at end of file
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/MultiResettableIteratorTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/MultiResettableIteratorTest.java
new file mode 100644 (file)
index 0000000..211679a
--- /dev/null
@@ -0,0 +1,127 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class MultiResettableIteratorTest {
+
+   @Test
+   public void testSingleIterator() {
+
+      ArrayList<Integer> arrayList = new ArrayList<>();
+      for (int i = 0; i < 1000; i++) {
+         arrayList.add(i);
+      }
+
+      MultiResettableIterator<Integer> iterator = new MultiResettableIterator<>(new ResettableIterator[]{ArrayResettableIterator.iterator(arrayList)});
+      for (int i = 0; i < 1000; i++) {
+         assertTrue(iterator.hasNext());
+         assertEquals(Integer.valueOf(i), iterator.next());
+      }
+      assertFalse(iterator.hasNext());
+
+      iterator.reset();
+
+      for (int i = 0; i < 1000; i++) {
+         assertTrue(iterator.hasNext());
+         assertEquals(Integer.valueOf(i), iterator.next());
+      }
+      assertFalse(iterator.hasNext());
+   }
+
+   @Test
+   public void testMutlipleIterators() {
+
+      ArrayList<Integer> arrayList = new ArrayList<>();
+      for (int i = 0; i < 1000; i++) {
+         arrayList.add(i);
+      }
+      ArrayList<Integer> arrayList2 = new ArrayList<>();
+      for (int i = 1000; i < 2000; i++) {
+         arrayList2.add(i);
+      }
+      ArrayList<Integer> arrayList3 = new ArrayList<>();
+      for (int i = 2000; i < 3000; i++) {
+         arrayList3.add(i);
+      }
+
+      MultiResettableIterator<Integer> iterator = new MultiResettableIterator<>(new ResettableIterator[]{ArrayResettableIterator.iterator(arrayList), ArrayResettableIterator.iterator(arrayList2), ArrayResettableIterator.iterator(arrayList3)});
+      for (int i = 0; i < 3000; i++) {
+         assertTrue(iterator.hasNext());
+         assertEquals(Integer.valueOf(i), iterator.next());
+      }
+      assertFalse(iterator.hasNext());
+
+
+      //Reset and ensure we re-iterate all.
+      iterator.reset();
+
+      for (int i = 0; i < 3000; i++) {
+         assertTrue(iterator.hasNext());
+         assertEquals(Integer.valueOf(i), iterator.next());
+      }
+      assertFalse(iterator.hasNext());
+
+
+   }
+
+   @Test
+   public void testMutlipleIteratorsResetMidIteration() {
+
+      ArrayList<Integer> arrayList = new ArrayList<>();
+      for (int i = 0; i < 1000; i++) {
+         arrayList.add(i);
+      }
+      ArrayList<Integer> arrayList2 = new ArrayList<>();
+      for (int i = 1000; i < 2000; i++) {
+         arrayList2.add(i);
+      }
+      ArrayList<Integer> arrayList3 = new ArrayList<>();
+      for (int i = 2000; i < 3000; i++) {
+         arrayList3.add(i);
+      }
+
+      MultiResettableIterator<Integer> iterator = new MultiResettableIterator<>(new ResettableIterator[]{ArrayResettableIterator.iterator(arrayList), ArrayResettableIterator.iterator(arrayList2), ArrayResettableIterator.iterator(arrayList3)});
+      for (int i = 0; i < 100; i++) {
+         assertTrue(iterator.hasNext());
+         assertEquals(Integer.valueOf(i), iterator.next());
+      }
+
+      //Reset and ensure we re-iterate all.
+      iterator.reset();
+
+      for (int i = 0; i < 3000; i++) {
+         assertTrue(iterator.hasNext());
+         assertNotNull(iterator.next());
+      }
+      assertFalse(iterator.hasNext());
+
+
+   }
+}
\ No newline at end of file
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/PriorityCollectionTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/PriorityCollectionTest.java
new file mode 100644 (file)
index 0000000..97b1a6d
--- /dev/null
@@ -0,0 +1,296 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class PriorityCollectionTest {
+
+   @Test
+   public void simpleInsertions() {
+      PriorityCollection<TestPriorityAware> set = new PriorityCollection<>(CopyOnWriteArraySet::new);
+
+      assertTrue(set.isEmpty());
+      assertTrue(set.add(new TestPriorityAware(1)));
+      assertFalse(set.isEmpty());
+
+      assertTrue(set.add(new TestPriorityAware(2)));
+      assertTrue(set.add(new TestPriorityAware(3)));
+
+      assertEquals(set.size(), 3);
+
+      assertTrue(set.contains(new TestPriorityAware(1)));
+      assertEquals(set.size(), 3);
+
+      assertTrue(set.remove(new TestPriorityAware(1)));
+      assertEquals(set.size(), 2);
+      assertFalse(set.contains(new TestPriorityAware(1)));
+      assertFalse(set.contains(new TestPriorityAware(5)));
+      assertEquals(set.size(), 2);
+
+      assertTrue(set.add(new TestPriorityAware(1)));
+      assertEquals(set.size(), 3);
+      assertFalse(set.add(new TestPriorityAware(1)));
+      assertEquals(set.size(), 3);
+   }
+
+   @Test
+   public void testRemove() {
+      PriorityCollection<TestPriorityAware> set = new PriorityCollection<>(CopyOnWriteArraySet::new);
+
+      assertTrue(set.isEmpty());
+      assertTrue(set.add(new TestPriorityAware(1)));
+      assertFalse(set.isEmpty());
+
+      assertFalse(set.remove(new TestPriorityAware(0)));
+      assertFalse(set.isEmpty());
+      assertTrue(set.remove(new TestPriorityAware(1)));
+      assertTrue(set.isEmpty());
+   }
+
+   @Test
+   public void concurrentInsertions() throws Throwable {
+      PriorityCollection<TestPriorityAware> set = new PriorityCollection<>(CopyOnWriteArraySet::new);
+      ExecutorService executor = Executors.newCachedThreadPool();
+
+      final int nThreads = 16;
+      final int N = 1000;
+
+      List<Future<?>> futures = new ArrayList<>();
+      for (int i = 0; i < nThreads; i++) {
+         final int threadIdx = i;
+
+         futures.add(executor.submit(() -> {
+            Random random = new Random();
+
+            for (int j = 0; j < N; j++) {
+               long key = Math.abs(random.nextLong());
+               // Ensure keys are unique
+               key -= key % (threadIdx + 1);
+
+               set.add(new TestPriorityAware(key));
+            }
+         }));
+      }
+
+      for (Future<?> future : futures) {
+         future.get();
+      }
+
+      assertEquals(set.size(), N * nThreads);
+
+      executor.shutdown();
+   }
+
+   @Test
+   public void concurrentInsertionsAndReads() throws Throwable {
+      PriorityCollection<TestPriorityAware> set = new PriorityCollection<>(CopyOnWriteArraySet::new);
+      ExecutorService executor = Executors.newCachedThreadPool();
+
+      final int nThreads = 16;
+      final int N = 1000;
+
+      List<Future<?>> futures = new ArrayList<>();
+      for (int i = 0; i < nThreads; i++) {
+         final int threadIdx = i;
+
+         futures.add(executor.submit(() -> {
+            Random random = new Random();
+
+            for (int j = 0; j < N; j++) {
+               long key = Math.abs(random.nextLong());
+               // Ensure keys are unique
+               key -= key % (threadIdx + 1);
+
+               set.add(new TestPriorityAware(key));
+            }
+         }));
+
+         futures.add(executor.submit(() -> {
+            Iterator<TestPriorityAware> iterator = set.resettableIterator();
+            while (iterator.hasNext()) {
+               iterator.next();
+            }
+         }));
+      }
+
+      for (Future<?> future : futures) {
+         future.get();
+      }
+
+      assertEquals(set.size(), N * nThreads);
+
+      executor.shutdown();
+   }
+
+   @Test
+   public void testIteration() {
+      PriorityCollection<TestPriorityAware> set = new PriorityCollection<>(CopyOnWriteArraySet::new);
+
+      assertFalse(set.iterator().hasNext());
+
+      set.add(new TestPriorityAware(0));
+
+      assertTrue(set.iterator().hasNext());
+
+      set.remove(new TestPriorityAware(0));
+
+      assertFalse(set.iterator().hasNext());
+
+      set.add(new TestPriorityAware(0));
+      set.add(new TestPriorityAware(1));
+      set.add(new TestPriorityAware(2));
+
+      List<TestPriorityAware> values = new ArrayList<>(set);
+
+      assertTrue(values.contains(new TestPriorityAware(0)));
+      assertTrue(values.contains(new TestPriorityAware(1)));
+      assertTrue(values.contains(new TestPriorityAware(2)));
+
+      set.clear();
+      assertTrue(set.isEmpty());
+   }
+
+   @Test
+   public void priorityTest() {
+      PriorityCollection<TestPriority> set = new PriorityCollection<>(CopyOnWriteArraySet::new);
+      set.add(new TestPriority("A", 127));
+      set.add(new TestPriority("B", 127));
+      set.add(new TestPriority("E", 0));
+      set.add(new TestPriority("D", 20));
+      set.add(new TestPriority("C", 127));
+
+      ResettableIterator<TestPriority> iterator = set.resettableIterator();
+      iterator.reset();
+      assertTrue(iterator.hasNext());
+
+      assertEquals("A", iterator.next().getName());
+
+      //Reset iterator should mark start as current position
+      iterator.reset();
+      assertTrue(iterator.hasNext());
+      assertEquals("B", iterator.next().getName());
+
+      assertTrue(iterator.hasNext());
+      assertEquals("C", iterator.next().getName());
+
+      //Expect another A as after reset, we started at B so after A we then expect the next level
+      assertTrue(iterator.hasNext());
+      assertEquals("A", iterator.next().getName());
+
+      assertTrue(iterator.hasNext());
+      assertEquals("D", iterator.next().getName());
+
+      assertTrue(iterator.hasNext());
+      assertEquals("E", iterator.next().getName());
+
+      //We have iterated all.
+      assertFalse(iterator.hasNext());
+
+      //Reset to iterate again.
+      iterator.reset();
+
+      //We expect the iteration to round robin from last returned at the level.
+      assertTrue(iterator.hasNext());
+      assertEquals("B", iterator.next().getName());
+
+
+   }
+
+
+   private class TestPriorityAware implements PriorityAware {
+
+      private long value;
+
+      private TestPriorityAware(long value) {
+         this.value = value;
+      }
+
+      @Override
+      public int getPriority() {
+         //10 priority levels
+         return (int) value % 10;
+      }
+
+      @Override
+      public boolean equals(Object o) {
+         if (this == o) return true;
+         if (o == null || getClass() != o.getClass()) return false;
+         TestPriorityAware that = (TestPriorityAware) o;
+         return value == that.value;
+      }
+
+      @Override
+      public int hashCode() {
+         return Objects.hash(value);
+      }
+   }
+
+   private class TestPriority implements PriorityAware {
+
+      private String name;
+      private int priority;
+
+      private TestPriority(String name, int priority) {
+         this.name = name;
+         this.priority = priority;
+      }
+
+      @Override
+      public int getPriority() {
+         return priority;
+      }
+
+      public String getName() {
+         return name;
+      }
+
+      @Override
+      public boolean equals(Object o) {
+         if (this == o) return true;
+         if (o == null || getClass() != o.getClass()) return false;
+         TestPriority that = (TestPriority) o;
+         return priority == that.priority &&
+                 Objects.equals(name, that.name);
+      }
+
+      @Override
+      public int hashCode() {
+         return Objects.hash(name, priority);
+      }
+   }
+
+}
\ No newline at end of file
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/UpdatableIteratorTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/UpdatableIteratorTest.java
new file mode 100644 (file)
index 0000000..633672a
--- /dev/null
@@ -0,0 +1,95 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class UpdatableIteratorTest {
+
+   @Test
+   public void testUnderlyingIterator() {
+
+      ArrayList<Integer> arrayList = new ArrayList<>();
+      for (int i = 0; i < 1000; i++) {
+         arrayList.add(i);
+      }
+
+      UpdatableIterator<Integer> iterator = new UpdatableIterator(ArrayResettableIterator.iterator(arrayList));
+      for (int i = 0; i < 1000; i++) {
+         assertTrue(iterator.hasNext());
+         assertEquals(Integer.valueOf(i), iterator.next());
+      }
+      assertFalse(iterator.hasNext());
+
+      iterator.reset();
+
+      for (int i = 0; i < 1000; i++) {
+         assertTrue(iterator.hasNext());
+         assertEquals(Integer.valueOf(i), iterator.next());
+      }
+      assertFalse(iterator.hasNext());
+   }
+
+   @Test
+   public void testUpdateIterator() {
+
+      ArrayList<Integer> arrayList = new ArrayList<>();
+      for (int i = 0; i < 1000; i++) {
+         arrayList.add(i);
+      }
+
+      ArrayList<Integer> arrayList2 = new ArrayList<>();
+      for (int i = 4000; i < 5000; i++) {
+         arrayList2.add(i);
+      }
+
+      UpdatableIterator<Integer> iterator = new UpdatableIterator(ArrayResettableIterator.iterator(arrayList));
+      for (int i = 0; i < 100; i++) {
+         assertTrue(iterator.hasNext());
+         assertEquals(Integer.valueOf(i), iterator.next());
+      }
+
+      //Update the iterator
+      iterator.update(ArrayResettableIterator.iterator(arrayList2));
+
+      //Ensure the current iterator in use is not updated until reset, and we iterate remaining.
+      for (int i = 100; i < 1000; i++) {
+         assertTrue(iterator.hasNext());
+         assertEquals(Integer.valueOf(i), iterator.next());
+      }
+      assertFalse(iterator.hasNext());
+
+      //Reset the iterator, we now expect to act on the updated iterator.
+      iterator.reset();
+
+      for (int i = 4000; i < 5000; i++) {
+         assertTrue(iterator.hasNext());
+         assertEquals(Integer.valueOf(i), iterator.next());
+      }
+      assertFalse(iterator.hasNext());
+   }
+}
\ No newline at end of file
index 8760c9c..7d20cfd 100644 (file)
@@ -473,6 +473,8 @@ public final class ActiveMQDefaultConfiguration {
 
    public static final int DEFAULT_MAX_QUEUE_CONSUMERS = -1;
 
+   public static final int DEFAULT_CONSUMER_PRIORITY = 0;
+
    public static final boolean DEFAULT_EXCLUSIVE = false;
 
    public static final boolean DEFAULT_LAST_VALUE = false;
@@ -1320,6 +1322,10 @@ public final class ActiveMQDefaultConfiguration {
       return DEFAULT_MAX_QUEUE_CONSUMERS;
    }
 
+   public static int getDefaultConsumerPriority() {
+      return DEFAULT_CONSUMER_PRIORITY;
+   }
+
    public static boolean getDefaultExclusive() {
       return DEFAULT_EXCLUSIVE;
    }
index b400c91..14ca75c 100644 (file)
@@ -879,6 +879,58 @@ public interface ClientSession extends XAResource, AutoCloseable {
     *
     * @param queueName  name of the queue to consume messages from
     * @param filter     only messages which match this filter will be consumed
+    * @param priority   the consumer priority
+    * @param browseOnly whether the ClientConsumer will only browse the queue or consume messages.
+    * @return a ClientConsumer
+    * @throws ActiveMQException if an exception occurs while creating the ClientConsumer
+    */
+   ClientConsumer createConsumer(SimpleString queueName,
+                                 SimpleString filter,
+                                 int priority,
+                                 boolean browseOnly) throws ActiveMQException;
+
+   /**
+    * Creates a ClientConsumer to consume or browse messages matching the filter from the queue with
+    * the given name.
+    * <p>
+    * If <code>browseOnly</code> is <code>true</code>, the ClientConsumer will receive the messages
+    * from the queue but they will not be consumed (the messages will remain in the queue). Note
+    * that paged messages will not be in the queue, and will therefore not be visible if
+    * {@code browseOnly} is {@code true}.
+    * <p>
+    * If <code>browseOnly</code> is <code>false</code>, the ClientConsumer will behave like consume
+    * the messages from the queue and the messages will effectively be removed from the queue.
+    *
+    * @param queueName  name of the queue to consume messages from
+    * @param filter     only messages which match this filter will be consumed
+    * @param windowSize the consumer window size
+    * @param maxRate    the maximum rate to consume messages
+    * @param browseOnly whether the ClientConsumer will only browse the queue or consume messages.
+    * @return a ClientConsumer
+    * @throws ActiveMQException if an exception occurs while creating the ClientConsumer
+    */
+   ClientConsumer createConsumer(SimpleString queueName,
+                                 SimpleString filter,
+                                 int windowSize,
+                                 int maxRate,
+                                 boolean browseOnly) throws ActiveMQException;
+
+
+   /**
+    * Creates a ClientConsumer to consume or browse messages matching the filter from the queue with
+    * the given name.
+    * <p>
+    * If <code>browseOnly</code> is <code>true</code>, the ClientConsumer will receive the messages
+    * from the queue but they will not be consumed (the messages will remain in the queue). Note
+    * that paged messages will not be in the queue, and will therefore not be visible if
+    * {@code browseOnly} is {@code true}.
+    * <p>
+    * If <code>browseOnly</code> is <code>false</code>, the ClientConsumer will behave like consume
+    * the messages from the queue and the messages will effectively be removed from the queue.
+    *
+    * @param queueName  name of the queue to consume messages from
+    * @param filter     only messages which match this filter will be consumed
+    * @param priority   the consumer priority
     * @param windowSize the consumer window size
     * @param maxRate    the maximum rate to consume messages
     * @param browseOnly whether the ClientConsumer will only browse the queue or consume messages.
@@ -887,6 +939,7 @@ public interface ClientSession extends XAResource, AutoCloseable {
     */
    ClientConsumer createConsumer(SimpleString queueName,
                                  SimpleString filter,
+                                 int priority,
                                  int windowSize,
                                  int maxRate,
                                  boolean browseOnly) throws ActiveMQException;
index 4946efb..7f9ede3 100644 (file)
@@ -70,6 +70,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
 
    private final SimpleString filterString;
 
+   private final int priority;
+
    private final SimpleString queueName;
 
    private final boolean browseOnly;
@@ -141,6 +143,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
                              final ConsumerContext consumerContext,
                              final SimpleString queueName,
                              final SimpleString filterString,
+                             final int priority,
                              final boolean browseOnly,
                              final int initialWindow,
                              final int clientWindowSize,
@@ -157,6 +160,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
 
       this.filterString = filterString;
 
+      this.priority = priority;
+
       this.browseOnly = browseOnly;
 
       this.sessionContext = sessionContext;
@@ -563,6 +568,11 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
    }
 
    @Override
+   public int getPriority() {
+      return priority;
+   }
+
+   @Override
    public SimpleString getQueueName() {
       return queueName;
    }
index 8190821..55d30e7 100644 (file)
@@ -29,6 +29,8 @@ public interface ClientConsumerInternal extends ClientConsumer {
 
    SimpleString getFilterString();
 
+   int getPriority();
+
    boolean isBrowseOnly();
 
    void handleMessage(ClientMessageInternal message) throws Exception;
index 5efaed6..e8e39e7 100644 (file)
@@ -800,6 +800,14 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
 
    @Override
    public ClientConsumer createConsumer(final SimpleString queueName,
+                                        final SimpleString filterString,
+                                        final int priority,
+                                        final boolean browseOnly) throws ActiveMQException {
+      return createConsumer(queueName, filterString, priority, consumerWindowSize, consumerMaxRate, browseOnly);
+   }
+
+   @Override
+   public ClientConsumer createConsumer(final SimpleString queueName,
                                         final boolean browseOnly) throws ActiveMQException {
       return createConsumer(queueName, null, consumerWindowSize, consumerMaxRate, browseOnly);
    }
@@ -821,6 +829,15 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
       return sessionContext.isWritable(callback);
    }
 
+   @Override
+   public ClientConsumer createConsumer(final SimpleString queueName,
+                                        final SimpleString filterString,
+                                        final int windowSize,
+                                        final int maxRate,
+                                        final boolean browseOnly) throws ActiveMQException {
+      return createConsumer(queueName, filterString, ActiveMQDefaultConfiguration.getDefaultConsumerPriority(), windowSize, maxRate, browseOnly);
+   }
+
    /**
     * Note, we DO NOT currently support direct consumers (i.e. consumers where delivery occurs on
     * the remoting thread).
@@ -834,10 +851,11 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
    @Override
    public ClientConsumer createConsumer(final SimpleString queueName,
                                         final SimpleString filterString,
+                                        final int priority,
                                         final int windowSize,
                                         final int maxRate,
                                         final boolean browseOnly) throws ActiveMQException {
-      return internalCreateConsumer(queueName, filterString, windowSize, maxRate, browseOnly);
+      return internalCreateConsumer(queueName, filterString, priority, windowSize, maxRate, browseOnly);
    }
 
    @Override
@@ -1931,12 +1949,13 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
     */
    private ClientConsumer internalCreateConsumer(final SimpleString queueName,
                                                  final SimpleString filterString,
+                                                 final int priority,
                                                  final int windowSize,
                                                  final int maxRate,
                                                  final boolean browseOnly) throws ActiveMQException {
       checkClosed();
 
-      ClientConsumerInternal consumer = sessionContext.createConsumer(queueName, filterString, windowSize, maxRate, ackBatchSize, browseOnly, executor, flowControlExecutor);
+      ClientConsumerInternal consumer = sessionContext.createConsumer(queueName, filterString, priority, windowSize, maxRate, ackBatchSize, browseOnly, executor, flowControlExecutor);
 
       addConsumer(consumer);
 
index 74d9847..fac76e4 100644 (file)
@@ -41,6 +41,11 @@ public interface CoreRemotingConnection extends RemotingConnection {
       return  (version > 0 && version < PacketImpl.ASYNC_RESPONSE_CHANGE_VERSION);
    }
 
+   default boolean isVersionSupportConsumerPriority() {
+      int version = getChannelVersion();
+      return  version >= PacketImpl.CONSUMER_PRIORITY_CHANGE_VERSION;
+   }
+
    /**
     * Sets the client protocol used on the communication. This will determine if the client has
     * support for certain packet types
index ccf10ab..bfe8ec0 100644 (file)
@@ -367,6 +367,7 @@ public class ActiveMQSessionContext extends SessionContext {
    @Override
    public ClientConsumerInternal createConsumer(SimpleString queueName,
                                                 SimpleString filterString,
+                                                int priority,
                                                 int windowSize,
                                                 int maxRate,
                                                 int ackBatchSize,
@@ -377,7 +378,7 @@ public class ActiveMQSessionContext extends SessionContext {
 
       ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID);
 
-      SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, browseOnly, true);
+      SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, priority, browseOnly, true);
 
       SessionQueueQueryResponseMessage queueInfo;
 
@@ -392,7 +393,7 @@ public class ActiveMQSessionContext extends SessionContext {
       // The value we send is just a hint
       final int consumerWindowSize = windowSize == ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE ? this.getDefaultConsumerWindowSize(queueInfo) : windowSize;
 
-      return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, consumerWindowSize, calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
+      return new ClientConsumerImpl(session, consumerContext, queueName, filterString, priority, browseOnly, consumerWindowSize, calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
    }
 
    @Override
@@ -875,7 +876,7 @@ public class ActiveMQSessionContext extends SessionContext {
          sendPacketWithoutLock(sessionChannel, createQueueRequest);
       }
 
-      SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(getConsumerID(consumerInternal), consumerInternal.getQueueName(), consumerInternal.getFilterString(), consumerInternal.isBrowseOnly(), false);
+      SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(getConsumerID(consumerInternal), consumerInternal.getQueueName(), consumerInternal.getFilterString(), consumerInternal.getPriority(), consumerInternal.isBrowseOnly(), false);
 
       sendPacketWithoutLock(sessionChannel, createConsumerRequest);
 
index 9a8166e..f576fea 100644 (file)
@@ -23,6 +23,7 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
 import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CheckFailoverMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CheckFailoverReplyMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
@@ -39,8 +40,8 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Disconnect
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectMessage_V2;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Ping;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionMessage;
@@ -91,7 +92,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAS
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2;
 
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CHECK_FOR_FAILOVER;
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CLUSTER_TOPOLOGY;
index 0168a47..62bf423 100644 (file)
@@ -33,7 +33,9 @@ public class PacketImpl implements Packet {
    public static final int ADDRESSING_CHANGE_VERSION = 129;
 
    // 2.7.0
-   public static final int ASYNC_RESPONSE_CHANGE_VERSION = 130;
+   public static final int ARTEMIS_2_7_0_VERSION = 130;
+   public static final int ASYNC_RESPONSE_CHANGE_VERSION = ARTEMIS_2_7_0_VERSION;
+   public static final int CONSUMER_PRIORITY_CHANGE_VERSION = ARTEMIS_2_7_0_VERSION;
 
 
    public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue.");
@@ -318,7 +320,7 @@ public class PacketImpl implements Packet {
 
       encodeHeader(buffer);
 
-      encodeRest(buffer);
+      encodeRest(buffer, connection);
 
       encodeSize(buffer);
 
@@ -394,6 +396,10 @@ public class PacketImpl implements Packet {
    public void encodeRest(final ActiveMQBuffer buffer) {
    }
 
+   public void encodeRest(final ActiveMQBuffer buffer, final CoreRemotingConnection coreRemotingConnection) {
+      encodeRest(buffer);
+   }
+
    public void decodeRest(final ActiveMQBuffer buffer) {
    }
 
index e07b50c..8db65b8 100644 (file)
  */
 package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
 
 public class SessionCreateConsumerMessage extends QueueAbstractPacket {
 
@@ -25,6 +27,8 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket {
 
    private SimpleString filterString;
 
+   private int priority;
+
    private boolean browseOnly;
 
    private boolean requiresResponse;
@@ -32,6 +36,7 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket {
    public SessionCreateConsumerMessage(final long id,
                                        final SimpleString queueName,
                                        final SimpleString filterString,
+                                       final int priority,
                                        final boolean browseOnly,
                                        final boolean requiresResponse) {
       super(SESS_CREATECONSUMER);
@@ -39,6 +44,7 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket {
       this.id = id;
       this.queueName = queueName;
       this.filterString = filterString;
+      this.priority = priority;
       this.browseOnly = browseOnly;
       this.requiresResponse = requiresResponse;
    }
@@ -55,6 +61,7 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket {
       buff.append(", id=" + id);
       buff.append(", browseOnly=" + browseOnly);
       buff.append(", requiresResponse=" + requiresResponse);
+      buff.append(", priority=" + priority);
       buff.append("]");
       return buff.toString();
    }
@@ -67,6 +74,10 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket {
       return filterString;
    }
 
+   public int getPriority() {
+      return priority;
+   }
+
    public boolean isBrowseOnly() {
       return browseOnly;
    }
@@ -84,17 +95,25 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket {
       this.filterString = filterString;
    }
 
+   public void setPriority(byte priority) {
+      this.priority = priority;
+   }
+
    public void setBrowseOnly(boolean browseOnly) {
       this.browseOnly = browseOnly;
    }
 
    @Override
-   public void encodeRest(final ActiveMQBuffer buffer) {
+   public void encodeRest(final ActiveMQBuffer buffer, final CoreRemotingConnection coreRemotingConnection) {
       buffer.writeLong(id);
       buffer.writeSimpleString(queueName);
       buffer.writeNullableSimpleString(filterString);
       buffer.writeBoolean(browseOnly);
       buffer.writeBoolean(requiresResponse);
+      //Priority Support added in 2.7.0
+      if (coreRemotingConnection.isVersionSupportConsumerPriority()) {
+         buffer.writeInt(priority);
+      }
    }
 
    @Override
@@ -104,6 +123,12 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket {
       filterString = buffer.readNullableSimpleString();
       browseOnly = buffer.readBoolean();
       requiresResponse = buffer.readBoolean();
+      //Priority Support Added in 2.7.0
+      if (buffer.readableBytes() > 0) {
+         priority = buffer.readInt();
+      } else {
+         priority = ActiveMQDefaultConfiguration.getDefaultConsumerPriority();
+      }
    }
 
    @Override
@@ -113,6 +138,7 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket {
       result = prime * result + (browseOnly ? 1231 : 1237);
       result = prime * result + ((filterString == null) ? 0 : filterString.hashCode());
       result = prime * result + (int) (id ^ (id >>> 32));
+      result = prime * result + priority;
       result = prime * result + ((queueName == null) ? 0 : queueName.hashCode());
       result = prime * result + (requiresResponse ? 1231 : 1237);
       return result;
@@ -134,6 +160,8 @@ public class SessionCreateConsumerMessage extends QueueAbstractPacket {
             return false;
       } else if (!filterString.equals(other.filterString))
          return false;
+      if (priority != other.priority)
+         return false;
       if (id != other.id)
          return false;
       if (queueName == null) {
index e25441b..9335204 100644 (file)
@@ -314,6 +314,7 @@ public abstract class SessionContext {
 
    public abstract ClientConsumerInternal createConsumer(SimpleString queueName,
                                                          SimpleString filterString,
+                                                         int priority,
                                                          int windowSize,
                                                          int maxRate,
                                                          int ackBatchSize,
index 693ab84..60f5d8b 100644 (file)
@@ -51,6 +51,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.QueueAttributes;
@@ -710,7 +711,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
             }
          }
 
-         consumer = session.createConsumer(queueName, null, false);
+         consumer = createClientConsumer(dest, queueName, null);
 
          ActiveMQMessageConsumer jbc = new ActiveMQMessageConsumer(options, connection, this, consumer, false, dest, selectorString, autoDeleteQueueName);
 
@@ -779,7 +780,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
 
             connection.addKnownDestination(dest.getSimpleAddress());
 
-            consumer = session.createConsumer(dest.getSimpleAddress(), coreFilterString, false);
+            consumer = createClientConsumer(dest, null, coreFilterString);
          } else {
             AddressQuery response = session.addressQuery(dest.getSimpleAddress());
 
@@ -804,8 +805,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
 
                createTemporaryQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, response);
 
-               consumer = session.createConsumer(queueName, null, false);
-
+               consumer = createClientConsumer(dest, queueName, null);
                autoDeleteQueueName = queueName;
             } else {
                // Durable sub
@@ -860,7 +860,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
                   }
                }
 
-               consumer = session.createConsumer(queueName, null, false);
+               consumer = createClientConsumer(dest, queueName, null);
             }
          }
 
@@ -874,6 +874,12 @@ public class ActiveMQSession implements QueueSession, TopicSession {
       }
    }
 
+   private ClientConsumer createClientConsumer(ActiveMQDestination destination, SimpleString queueName, SimpleString coreFilterString) throws ActiveMQException {
+      QueueAttributes queueAttributes = destination.getQueueAttributes() == null ? new QueueAttributes() : destination.getQueueAttributes();
+      int priority = queueAttributes.getConsumerPriority() == null ? ActiveMQDefaultConfiguration.getDefaultConsumerPriority() : queueAttributes.getConsumerPriority();
+      return session.createConsumer(queueName == null ? destination.getSimpleAddress() : queueName, coreFilterString, priority, false);
+   }
+
    public void ackAllConsumers() throws JMSException {
       checkClosed();
    }
index c364387..f850cc1 100644 (file)
  */
 package org.apache.activemq.artemis.protocol.amqp.broker;
 
+import java.util.Map;
 import java.util.concurrent.Executor;
 
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
@@ -80,6 +82,8 @@ public class AMQPSessionCallback implements SessionCallback {
 
    private static final Logger logger = Logger.getLogger(AMQPSessionCallback.class);
 
+   private static final Symbol PRIORITY = Symbol.getSymbol("priority");
+
    protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0);
 
    private final AMQPConnectionCallback protonSPI;
@@ -199,7 +203,9 @@ public class AMQPSessionCallback implements SessionCallback {
 
       filter = SelectorTranslator.convertToActiveMQFilterString(filter);
 
-      ServerConsumer consumer = serverSession.createConsumer(consumerID, queue, SimpleString.toSimpleString(filter), browserOnly, false, null);
+      int priority = getPriority(protonSender.getSender().getRemoteProperties());
+
+      ServerConsumer consumer = serverSession.createConsumer(consumerID, queue, SimpleString.toSimpleString(filter), priority, browserOnly, false, null);
 
       // AMQP handles its own flow control for when it's started
       consumer.setStarted(true);
@@ -209,6 +215,11 @@ public class AMQPSessionCallback implements SessionCallback {
       return consumer;
    }
 
+   private int getPriority(Map<Symbol, Object> properties) {
+      Number value = properties == null ? null : (Number) properties.get(PRIORITY);
+      return value == null ? ActiveMQDefaultConfiguration.getDefaultConsumerPriority() : value.intValue();
+   }
+
    public void startSender(Object brokerConsumer) throws Exception {
       ServerConsumer serverConsumer = (ServerConsumer) brokerConsumer;
       // flow control is done at proton
index 235d699..4ae4968 100644 (file)
@@ -78,6 +78,7 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext {
    @Override
    public ClientConsumerInternal createConsumer(SimpleString queueName,
                                                 SimpleString filterString,
+                                                int priority,
                                                 int windowSize,
                                                 int maxRate,
                                                 int ackBatchSize,
@@ -88,7 +89,7 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext {
 
       ActiveMQConsumerContext consumerContext = new ActiveMQConsumerContext(consumerID);
 
-      SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, browseOnly, true);
+      SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(consumerID, queueName, filterString, priority, browseOnly, true);
 
       SessionQueueQueryResponseMessage queueInfo = (SessionQueueQueryResponseMessage) getSessionChannel().sendBlocking(request, PacketImpl.SESS_QUEUEQUERY_RESP);
 
@@ -96,7 +97,7 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext {
       // could be overridden on the queue settings
       // The value we send is just a hint
 
-      return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, windowSize, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
+      return new ClientConsumerImpl(session, consumerContext, queueName, filterString, priority, browseOnly, windowSize, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
    }
 
 }
index 45d9fa1..c35bc64 100644 (file)
@@ -137,11 +137,10 @@ public class AMQConsumer {
       }
 
       SimpleString destinationName = new SimpleString(session.convertWildcard(openwireDestination.getPhysicalName()));
-
       if (openwireDestination.isTopic()) {
          SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), destinationName.toString(), info.getSubscriptionName(), selector, destinationName);
 
-         serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1);
+         serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.getPriority(), info.isBrowser(), false, -1);
          serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
          //only advisory topic consumers need this.
          ((ServerConsumerImpl)serverConsumer).setPreAcknowledge(preAck);
@@ -151,7 +150,7 @@ public class AMQConsumer {
          } catch (ActiveMQQueueExistsException e) {
             // ignore
          }
-         serverConsumer = session.getCoreSession().createConsumer(nativeId, destinationName, selector, info.isBrowser(), false, -1);
+         serverConsumer = session.getCoreSession().createConsumer(nativeId, destinationName, selector, info.getPriority(), info.isBrowser(), false, -1);
          serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
          AddressSettings addrSettings = session.getCoreServer().getAddressSettingsRepository().getMatch(destinationName.toString());
          if (addrSettings != null) {
index 3b68d8b..faefa39 100644 (file)
@@ -36,13 +36,14 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManager;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAddMetaDataMessage;
@@ -83,7 +84,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAR
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXASetTimeoutResponseMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
-import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2;
 import org.apache.activemq.artemis.core.remoting.CloseListener;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
@@ -323,7 +323,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                case SESS_CREATECONSUMER: {
                   SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
                   requiresResponse = request.isRequiresResponse();
-                  session.createConsumer(request.getID(), request.getQueueName(), request.getFilterString(), request.isBrowseOnly());
+                  session.createConsumer(request.getID(), request.getQueueName(), request.getFilterString(), request.getPriority(), request.isBrowseOnly(), true, null);
                   if (requiresResponse) {
                      // We send back queue information on the queue as a response- this allows the queue to
                      // be automatically recreated on failover
index 1dfff29..babddc2 100644 (file)
@@ -18,10 +18,12 @@ package org.apache.activemq.artemis.core.server;
 
 import java.util.List;
 
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.core.PriorityAware;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 
-public interface Consumer {
+public interface Consumer extends PriorityAware {
 
    /**
     *
@@ -85,6 +87,11 @@ public interface Consumer {
    /** an unique sequential ID for this consumer */
    long sequentialID();
 
+   @Override
+   default int getPriority() {
+      return ActiveMQDefaultConfiguration.getDefaultConsumerPriority();
+   }
+
    default void errorProcessing(Throwable e, MessageReference reference) {
 
    }
index cfc3e01..3ba3384 100644 (file)
@@ -107,7 +107,15 @@ public interface ServerSession extends SecurityAuth {
 
    void addCloseable(Closeable closeable);
 
-   /**
+   ServerConsumer createConsumer(long consumerID,
+                                 SimpleString queueName,
+                                 SimpleString filterString,
+                                 int priority,
+                                 boolean browseOnly,
+                                 boolean supportLargeMessage,
+                                 Integer credits) throws Exception;
+
+    /**
     * To be used by protocol heads that needs to control the transaction outside the session context.
     */
    void resetTX(Transaction transaction);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java
new file mode 100644 (file)
index 0000000..895b1ee
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.RepeatableIterator;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Set;
+
+public interface QueueConsumers<T extends PriorityAware> extends Iterable<T>, RepeatableIterator<T>, ResettableIterator<T> {
+
+   Set<Integer> getPriorites();
+
+   boolean add(T t);
+
+   boolean remove(T t);
+
+   int size();
+
+   boolean isEmpty();
+
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
new file mode 100644 (file)
index 0000000..0495c51
--- /dev/null
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.UpdatableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able and are created on delegating iterator().
+ *
+ * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
+ *         but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl<T extends PriorityAware> implements QueueConsumers<T> {
+
+   private final PriorityCollection<T> consumers = new PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection<T> unmodifiableConsumers = Collections.unmodifiableCollection(consumers);
+   private UpdatableIterator<T> iterator = new UpdatableIterator<>(consumers.resettableIterator());
+
+   //-- START :: ResettableIterator Methods
+   // As any iterator, these are not thread-safe and should ONLY be called by a single thread at a time.
+
+   @Override
+   public boolean hasNext() {
+      return iterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+      return iterator.next();
+   }
+
+   @Override
+   public void repeat() {
+      iterator.repeat();
+   }
+
+   @Override
+   public void reset() {
+      iterator.reset();
+   }
+
+   //-- END :: ResettableIterator Methods
+
+
+   @Override
+   public boolean add(T t) {
+      boolean result = consumers.add(t);
+      if (result) {
+         iterator.update(consumers.resettableIterator());
+      }
+      return result;
+   }
+
+   @Override
+   public boolean remove(T t) {
+      boolean result = consumers.remove(t);
+      if (result) {
+         iterator.update(consumers.resettableIterator());
+      }
+      return result;
+   }
+
+   @Override
+   public int size() {
+      return consumers.size();
+   }
+
+   @Override
+   public boolean isEmpty() {
+      return consumers.isEmpty();
+   }
+
+   @Override
+   public Iterator<T> iterator() {
+      return unmodifiableConsumers.iterator();
+   }
+
+   @Override
+   public void forEach(Consumer<? super T> action) {
+      unmodifiableConsumers.forEach(action);
+   }
+
+   @Override
+   public Spliterator<T> spliterator() {
+      return unmodifiableConsumers.spliterator();
+   }
+
+   @Override
+   public Set<Integer> getPriorites() {
+      return consumers.getPriorites();
+   }
+
+}
index 521ece3..9f3b53b 100644 (file)
@@ -21,7 +21,6 @@ import java.io.StringWriter;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -29,9 +28,9 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
@@ -42,6 +41,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQNullRefException;
@@ -72,6 +72,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.HandleStatus;
 import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.PriorityAware;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueFactory;
 import org.apache.activemq.artemis.core.server.RoutingContext;
@@ -98,6 +99,7 @@ import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
 import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl;
+import org.apache.activemq.artemis.utils.collections.SingletonIterator;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
 import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
 import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
@@ -184,11 +186,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    private final QueuePendingMessageMetrics deliveringMetrics = new QueuePendingMessageMetrics(this);
 
-   // used to control if we should recalculate certain positions inside deliverAsync
-   private volatile boolean consumersChanged = true;
-
-   private final List<ConsumerHolder> consumerList = new CopyOnWriteArrayList<>();
-
    private final ScheduledDeliveryHandler scheduledDeliveryHandler;
 
    private AtomicLong messagesAdded = new AtomicLong(0);
@@ -235,14 +232,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    private final AtomicInteger consumersCount = new AtomicInteger();
 
    private volatile long consumerRemovedTimestamp = -1;
-
-   private final Set<Consumer> consumerSet = new HashSet<>();
+   private final QueueConsumers<ConsumerHolder<? extends Consumer>> consumers = new QueueConsumersImpl<>();
 
    private final Map<SimpleString, Consumer> groups = new HashMap<>();
 
-   private volatile SimpleString expiryAddress;
+   private volatile Consumer exclusiveConsumer;
 
-   private int pos;
+   private volatile SimpleString expiryAddress;
 
    private final ArtemisExecutor executor;
 
@@ -327,7 +323,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
       out.println("queueMemorySize=" + queueMemorySize);
 
-      for (ConsumerHolder holder : consumerList) {
+      for (ConsumerHolder holder : consumers) {
          out.println("consumer: " + holder.consumer.debug());
       }
 
@@ -568,6 +564,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    @Override
    public synchronized void setExclusive(boolean exclusive) {
       this.exclusive = exclusive;
+      if (!exclusive) {
+         exclusiveConsumer = null;
+      }
    }
 
    @Override
@@ -1032,22 +1031,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       enterCritical(CRITICAL_CONSUMER);
       try {
          synchronized (this) {
-            if (maxConsumers != MAX_CONSUMERS_UNLIMITED && consumersCount.get() >= maxConsumers) {
+            if (maxConsumers != MAX_CONSUMERS_UNLIMITED && consumers.size() >= maxConsumers) {
                throw ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name);
             }
 
-            consumersChanged = true;
-
             if (!consumer.supportsDirectDelivery()) {
                this.supportsDirectDeliver = false;
             }
 
             cancelRedistributor();
-
-            consumerList.add(new ConsumerHolder(consumer));
-
-            if (consumerSet.add(consumer)) {
-               int currentConsumerCount = consumersCount.incrementAndGet();
+            ConsumerHolder<Consumer> newConsumerHolder = new ConsumerHolder<>(consumer);
+            if (consumers.add(newConsumerHolder)) {
+               int currentConsumerCount = consumers.size();
                if (delayBeforeDispatch >= 0) {
                   dispatchStartTimeUpdater.compareAndSet(this,-1, delayBeforeDispatch + System.currentTimeMillis());
                }
@@ -1075,33 +1070,33 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       enterCritical(CRITICAL_CONSUMER);
       try {
          synchronized (this) {
-            consumersChanged = true;
 
-            for (ConsumerHolder holder : consumerList) {
+            boolean consumerRemoved = false;
+            for (ConsumerHolder holder : consumers) {
                if (holder.consumer == consumer) {
                   if (holder.iter != null) {
                      holder.iter.close();
                   }
-                  consumerList.remove(holder);
+                  consumers.remove(holder);
+                  consumerRemoved = true;
                   break;
                }
             }
 
             this.supportsDirectDeliver = checkConsumerDirectDeliver();
 
-            if (pos > 0 && pos >= consumerList.size()) {
-               pos = consumerList.size() - 1;
-            }
-
-            if (consumerSet.remove(consumer)) {
-               int currentConsumerCount = consumersCount.decrementAndGet();
+            if (consumerRemoved) {
                consumerRemovedTimestampUpdater.set(this, System.currentTimeMillis());
-               boolean stopped = dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(true), BooleanUtil.toInt(currentConsumerCount != 0));
+               boolean stopped = dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(true), BooleanUtil.toInt(consumers.size() != 0));
                if (stopped) {
                   dispatchStartTimeUpdater.set(this, -1);
                }
             }
 
+            if (consumer == exclusiveConsumer) {
+               exclusiveConsumer = null;
+            }
+
             LinkedList<SimpleString> groupsToRemove = null;
 
             for (SimpleString groupID : groups.keySet()) {
@@ -1134,7 +1129,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    private boolean checkConsumerDirectDeliver() {
       boolean supports = true;
-      for (ConsumerHolder consumerCheck : consumerList) {
+      for (ConsumerHolder consumerCheck : consumers) {
          if (!consumerCheck.consumer.supportsDirectDelivery()) {
             supports = false;
          }
@@ -1157,7 +1152,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
 
       if (delay > 0) {
-         if (consumerSet.isEmpty()) {
+         if (consumers.isEmpty()) {
             DelayedAddRedistributor dar = new DelayedAddRedistributor(executor);
 
             redistributorFuture = scheduledExecutor.schedule(dar, delay, TimeUnit.MILLISECONDS);
@@ -1194,7 +1189,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    @Override
    public int getConsumerCount() {
-      return consumersCount.get();
+      return consumers.size();
    }
 
    @Override
@@ -1203,8 +1198,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    }
 
    @Override
-   public synchronized Set<Consumer> getConsumers() {
-      return new HashSet<>(consumerSet);
+   public Set<Consumer> getConsumers() {
+      Set<Consumer> consumersSet = new HashSet<>(this.consumers.size());
+      for (ConsumerHolder<? extends Consumer> consumerHolder : consumers) {
+         consumersSet.add(consumerHolder.consumer);
+      }
+      return consumersSet;
    }
 
    @Override
@@ -1229,7 +1228,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    @Override
    public boolean hasMatchingConsumer(final Message message) {
-      for (ConsumerHolder holder : consumerList) {
+      for (ConsumerHolder holder : consumers) {
          Consumer consumer = holder.consumer;
 
          if (consumer instanceof Redistributor) {
@@ -1376,18 +1375,20 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    @Override
    public Map<String, List<MessageReference>> getDeliveringMessages() {
-
-      List<ConsumerHolder> consumerListClone = cloneConsumersList();
+      final Iterator<ConsumerHolder<? extends Consumer>> consumerHolderIterator;
+      synchronized (this) {
+         consumerHolderIterator = redistributor == null ? consumers.iterator() : SingletonIterator.newInstance(redistributor);
+      }
 
       Map<String, List<MessageReference>> mapReturn = new HashMap<>();
 
-      for (ConsumerHolder holder : consumerListClone) {
+      while (consumerHolderIterator.hasNext()) {
+         ConsumerHolder holder = consumerHolderIterator.next();
          List<MessageReference> msgs = holder.consumer.getDeliveringMessages();
          if (msgs != null && msgs.size() > 0) {
             mapReturn.put(holder.consumer.toManagementString(), msgs);
          }
       }
-
       return mapReturn;
    }
 
@@ -1867,7 +1868,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          postOffice.removeBinding(name, tx, true);
 
          if (removeConsumers) {
-            for (ConsumerHolder consumerHolder : consumerList) {
+            for (ConsumerHolder consumerHolder : consumers) {
                consumerHolder.consumer.disconnect();
             }
          }
@@ -2239,7 +2240,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    @Override
    public synchronized void resetAllIterators() {
-      for (ConsumerHolder holder : this.consumerList) {
+      for (ConsumerHolder holder : this.consumers) {
          holder.resetIterator();
       }
       if (redistributor != null) {
@@ -2420,14 +2421,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       // Either the iterator is empty or the consumer is busy
       int noDelivery = 0;
 
-      int size = 0;
-
-      int endPos = -1;
-
       int handled = 0;
 
       long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
-
+      consumers.reset();
       while (true) {
          if (handled == MAX_DELIVERIES_IN_LOOP) {
             // Schedule another one - we do this to prevent a single thread getting caught up in this loop for too
@@ -2465,21 +2462,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
             ConsumerHolder<? extends Consumer> holder;
             if (redistributor == null) {
-
-               if (endPos < 0 || consumersChanged) {
-                  consumersChanged = false;
-
-                  size = consumerList.size();
-
-                  endPos = pos - 1;
-
-                  if (endPos < 0) {
-                     endPos = size - 1;
-                     noDelivery = 0;
-                  }
+               if (consumers.hasNext()) {
+                  holder = consumers.next();
+               } else {
+                  break;
                }
-
-               holder = consumerList.get(pos);
             } else {
                holder = redistributor;
             }
@@ -2508,7 +2495,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
 
                   handled++;
-
+                  consumers.reset();
                   continue;
                }
 
@@ -2516,37 +2503,28 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                   logger.trace("Queue " + this.getName() + " is delivering reference " + ref);
                }
 
-               // If a group id is set, then this overrides the consumer chosen round-robin
-
-               SimpleString groupID = extractGroupID(ref);
-
-               if (groupID != null) {
-                  groupConsumer = groups.get(groupID);
+               final SimpleString groupID = extractGroupID(ref);
+               groupConsumer = getGroupConsumer(groupID);
 
-                  if (groupConsumer != null) {
-                     consumer = groupConsumer;
-                  }
-               }
-
-               if (exclusive && redistributor == null) {
-                  consumer = consumerList.get(0).consumer;
+               if (groupConsumer != null) {
+                  consumer = groupConsumer;
                }
 
                HandleStatus status = handle(ref, consumer);
 
                if (status == HandleStatus.HANDLED) {
 
-                  deliveriesInTransit.countUp();
-
-                  handledconsumer = consumer;
-
-                  removeMessageReference(holder, ref);
-
                   if (redistributor == null) {
                      handleMessageGroup(ref, consumer, groupConsumer, groupID);
                   }
 
+                  deliveriesInTransit.countUp();
+
+
+                  removeMessageReference(holder, ref);
+                  handledconsumer = consumer;
                   handled++;
+                  consumers.reset();
                } else if (status == HandleStatus.BUSY) {
                   try {
                      holder.iter.repeat();
@@ -2561,18 +2539,19 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                   noDelivery++;
                } else if (status == HandleStatus.NO_MATCH) {
                   // nothing to be done on this case, the iterators will just jump next
+                  consumers.reset();
                }
             }
 
-            if (redistributor != null || groupConsumer != null || exclusive) {
+            if (redistributor != null || groupConsumer != null) {
                if (noDelivery > 0) {
                   break;
                }
                noDelivery = 0;
-            } else if (pos == endPos) {
+            } else if (!consumers.hasNext()) {
                // Round robin'd all
 
-               if (noDelivery == size) {
+               if (noDelivery == this.consumers.size()) {
                   if (handledconsumer != null) {
                      // this shouldn't really happen,
                      // however I'm keeping this as an assertion case future developers ever change the logic here on this class
@@ -2587,16 +2566,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
                noDelivery = 0;
             }
-
-            // Only move onto the next position if the consumer on the current position was used.
-            // When using group we don't need to load balance to the next position
-            if (redistributor == null && !exclusive && groupConsumer == null) {
-               pos++;
-            }
-
-            if (pos >= size) {
-               pos = 0;
-            }
          }
 
          if (handledconsumer != null) {
@@ -2637,7 +2606,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          return null;
       } else {
          try {
-            // But we don't use the groupID on internal queues (clustered queues) otherwise the group map would leak forever
             return ref.getMessage().getGroupID();
          } catch (Throwable e) {
             ActiveMQServerLogger.LOGGER.unableToExtractGroupID(e);
@@ -2738,15 +2706,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    private void internalAddRedistributor(final ArtemisExecutor executor) {
       // create the redistributor only once if there are no local consumers
-      if (consumerSet.isEmpty() && redistributor == null) {
+      if (consumers.isEmpty() && redistributor == null) {
          if (logger.isTraceEnabled()) {
             logger.trace("QueueImpl::Adding redistributor on queue " + this.toString());
          }
 
          redistributor = (new ConsumerHolder(new Redistributor(this, storageManager, postOffice, executor, QueueImpl.REDISTRIBUTOR_BATCH_SIZE)));
 
-         consumersChanged = true;
-
          redistributor.consumer.start();
 
          deliverAsync();
@@ -3078,9 +3044,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    private boolean deliverDirect(final MessageReference ref) {
       synchronized (this) {
          if (!supportsDirectDeliver) {
-            // this should never happen, but who knows?
-            // if someone ever change add and removeConsumer,
-            // this would protect any eventual bug
             return false;
          }
          if (paused || !canDispatch() && redistributor == null) {
@@ -3091,45 +3054,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
             return true;
          }
 
-         int startPos = pos;
+         consumers.reset();
 
-         int size = consumerList.size();
-
-         while (true) {
-            ConsumerHolder<? extends Consumer> holder;
-            if (redistributor == null) {
-               holder = consumerList.get(pos);
-            } else {
-               holder = redistributor;
-            }
+         while (consumers.hasNext() || redistributor != null) {
 
+            ConsumerHolder<? extends Consumer> holder = redistributor == null ? consumers.next() : redistributor;
             Consumer consumer = holder.consumer;
 
-            Consumer groupConsumer = null;
-
-            // If a group id is set, then this overrides the consumer chosen round-robin
+            final SimpleString groupID = extractGroupID(ref);
+            Consumer groupConsumer = getGroupConsumer(groupID);
 
-            SimpleString groupID = extractGroupID(ref);
-
-            if (groupID != null) {
-               groupConsumer = groups.get(groupID);
-
-               if (groupConsumer != null) {
-                  consumer = groupConsumer;
-               }
-            }
-
-            if (exclusive && redistributor == null) {
-               consumer = consumerList.get(0).consumer;
-            }
-
-            // Only move onto the next position if the consumer on the current position was used.
-            if (redistributor == null && !exclusive && groupConsumer == null) {
-               pos++;
-            }
-
-            if (pos == size) {
-               pos = 0;
+            if (groupConsumer != null) {
+               consumer = groupConsumer;
             }
 
             HandleStatus status = handle(ref, consumer);
@@ -3144,11 +3080,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
                deliveriesInTransit.countUp();
                proceedDeliver(consumer, ref);
+               consumers.reset();
                return true;
             }
 
-            if (pos == startPos || redistributor != null || groupConsumer != null || exclusive) {
-               // Tried them all
+            if (redistributor != null || groupConsumer != null) {
                break;
             }
          }
@@ -3160,13 +3096,34 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
    }
 
+   private Consumer getGroupConsumer(SimpleString groupID) {
+      Consumer groupConsumer = null;
+      if (exclusive) {
+         // If exclusive is set, then this overrides the consumer chosen round-robin
+         groupConsumer = exclusiveConsumer;
+      } else {
+         // If a group id is set, then this overrides the consumer chosen round-robin
+         if (groupID != null) {
+            groupConsumer = groups.get(groupID);
+         }
+      }
+      return groupConsumer;
+   }
+
    private void handleMessageGroup(MessageReference ref, Consumer consumer, Consumer groupConsumer, SimpleString groupID) {
-      if (groupID != null) {
+      if (exclusive) {
+         if (groupConsumer == null) {
+            exclusiveConsumer = consumer;
+         }
+         consumers.repeat();
+      } else if (groupID != null) {
          if (extractGroupSequence(ref) == -1) {
             groups.remove(groupID);
-         }
-         if (groupConsumer == null) {
+            consumers.repeat();
+         } else if (groupConsumer == null) {
             groups.put(groupID, consumer);
+         } else {
+            consumers.repeat();
          }
       }
    }
@@ -3245,19 +3202,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       return status;
    }
 
-   private List<ConsumerHolder> cloneConsumersList() {
-      List<ConsumerHolder> consumerListClone;
-
-      synchronized (this) {
-         if (redistributor == null) {
-            consumerListClone = new ArrayList<>(consumerList);
-         } else {
-            consumerListClone = Collections.singletonList(redistributor);
-         }
-      }
-      return consumerListClone;
-   }
-
    @Override
    public void postAcknowledge(final MessageReference ref) {
       QueueImpl queue = (QueueImpl) ref.getQueue();
@@ -3407,7 +3351,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    // Inner classes
    // --------------------------------------------------------------------------
 
-   protected static class ConsumerHolder<T extends  Consumer> {
+   protected static class ConsumerHolder<T extends Consumer> implements PriorityAware {
 
       ConsumerHolder(final T consumer) {
          this.consumer = consumer;
@@ -3424,6 +3368,27 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          iter = null;
       }
 
+      private Consumer consumer() {
+         return consumer;
+      }
+
+      @Override
+      public boolean equals(Object o) {
+         if (this == o) return true;
+         if (o == null || getClass() != o.getClass()) return false;
+         ConsumerHolder<?> that = (ConsumerHolder<?>) o;
+         return Objects.equals(consumer, that.consumer);
+      }
+
+      @Override
+      public int hashCode() {
+         return Objects.hash(consumer);
+      }
+
+      @Override
+      public int getPriority() {
+         return consumer.getPriority();
+      }
    }
 
    private class DelayedAddRedistributor implements Runnable {
@@ -3745,19 +3710,19 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
             logger.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
          }
 
-         Set<Consumer> consumersSet = getConsumers();
 
-         if (consumersSet.size() == 0) {
+         if (consumers.size() == 0) {
             logger.debug("There are no consumers, no need to check slow consumer's rate");
             return;
-         } else if (queueRate < (threshold * consumersSet.size())) {
+         } else if (queueRate < (threshold * consumers.size())) {
             if (logger.isDebugEnabled()) {
                logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
             }
             return;
          }
 
-         for (Consumer consumer : consumersSet) {
+         for (ConsumerHolder consumerHolder : consumers) {
+            Consumer consumer = consumerHolder.consumer();
             if (consumer instanceof ServerConsumerImpl) {
                ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
                float consumerRate = serverConsumer.getRate();
index 477fae6..48e2f06 100644 (file)
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
 import org.apache.activemq.artemis.api.core.Message;
@@ -88,6 +89,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
    private final Filter filter;
 
+   private final int priority;
+
    private final int minLargeMessageSize;
 
    private final ServerSession session;
@@ -194,12 +197,32 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
                              final boolean supportLargeMessage,
                              final Integer credits,
                              final ActiveMQServer server) throws Exception {
+      this(id, session, binding, filter, ActiveMQDefaultConfiguration.getDefaultConsumerPriority(), started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
+   }
+
+   public ServerConsumerImpl(final long id,
+                             final ServerSession session,
+                             final QueueBinding binding,
+                             final Filter filter,
+                             final int priority,
+                             final boolean started,
+                             final boolean browseOnly,
+                             final StorageManager storageManager,
+                             final SessionCallback callback,
+                             final boolean preAcknowledge,
+                             final boolean strictUpdateDeliveryCount,
+                             final ManagementService managementService,
+                             final boolean supportLargeMessage,
+                             final Integer credits,
+                             final ActiveMQServer server) throws Exception {
       this.id = id;
 
       this.sequentialID = server.getStorageManager().generateID();
 
       this.filter = filter;
 
+      this.priority = priority;
+
       this.session = session;
 
       this.binding = binding;
@@ -502,6 +525,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
    }
 
    @Override
+   public int getPriority() {
+      return priority;
+   }
+
+   @Override
    public SimpleString getFilterString() {
       return filter == null ? null : filter.getFilterString();
    }
index 53f41dd..56c60f5 100644 (file)
@@ -35,6 +35,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.artemis.Closeable;
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
@@ -455,6 +456,17 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                                         final boolean browseOnly,
                                         final boolean supportLargeMessage,
                                         final Integer credits) throws Exception {
+      return this.createConsumer(consumerID, queueName, filterString, ActiveMQDefaultConfiguration.getDefaultConsumerPriority(), browseOnly, supportLargeMessage, credits);
+   }
+
+   @Override
+   public ServerConsumer createConsumer(final long consumerID,
+                                        final SimpleString queueName,
+                                        final SimpleString filterString,
+                                        final int priority,
+                                        final boolean browseOnly,
+                                        final boolean supportLargeMessage,
+                                        final Integer credits) throws Exception {
       final SimpleString unPrefixedQueueName = removePrefix(queueName);
 
       Binding binding = postOffice.getBinding(unPrefixedQueueName);
@@ -485,7 +497,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                filterString, browseOnly, supportLargeMessage));
       }
 
-      ServerConsumer consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) binding, filter, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
+      ServerConsumer consumer = new ServerConsumerImpl(consumerID, this, (QueueBinding) binding, filter, priority, started, browseOnly, storageManager, callback, preAcknowledge, strictUpdateDeliveryCount, managementService, supportLargeMessage, credits, server);
       consumers.put(consumer.getID(), consumer);
 
       if (server.hasBrokerConsumerPlugins()) {
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImplTest.java
new file mode 100644 (file)
index 0000000..1055af7
--- /dev/null
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class QueueConsumersImplTest {
+
+   private QueueConsumers<TestPriority> queueConsumers;
+
+   @Before
+   public void setUp() {
+      queueConsumers = new QueueConsumersImpl<>();
+   }
+
+   @Test
+   public void addTest() {
+      TestPriority testPriority = new TestPriority("hello", 0);
+      assertFalse(queueConsumers.hasNext());
+
+      queueConsumers.add(testPriority);
+      queueConsumers.reset();
+      assertTrue(queueConsumers.hasNext());
+
+      assertEquals(testPriority, queueConsumers.next());
+   }
+
+   @Test
+   public void removeTest() {
+      TestPriority testPriority = new TestPriority("hello", 0);
+      assertFalse(queueConsumers.hasNext());
+
+      queueConsumers.add(testPriority);
+      queueConsumers.reset();
+      assertTrue(queueConsumers.hasNext());
+
+      queueConsumers.remove(testPriority);
+      queueConsumers.reset();
+      assertFalse(queueConsumers.hasNext());
+
+      assertEquals(0, queueConsumers.getPriorites().size());
+      queueConsumers.remove(testPriority);
+      queueConsumers.remove(testPriority);
+
+   }
+
+
+
+   @Test
+   public void roundRobinTest() {
+      queueConsumers.add(new TestPriority("A", 127));
+      queueConsumers.add(new TestPriority("B", 127));
+      queueConsumers.add(new TestPriority("E", 0));
+      queueConsumers.add(new TestPriority("D", 20));
+      queueConsumers.add(new TestPriority("C", 127));
+      queueConsumers.reset();
+      assertTrue(queueConsumers.hasNext());
+
+      assertEquals("A", queueConsumers.next().getName());
+
+      //Reset iterator should mark start as current position
+      queueConsumers.reset();
+      assertTrue(queueConsumers.hasNext());
+      assertEquals("B", queueConsumers.next().getName());
+
+      assertTrue(queueConsumers.hasNext());
+      assertEquals("C", queueConsumers.next().getName());
+
+      //Expect another A as after reset, we started at B so after A we then expect the next level
+      assertTrue(queueConsumers.hasNext());
+      assertEquals("A", queueConsumers.next().getName());
+
+      assertTrue(queueConsumers.hasNext());
+      assertEquals("D", queueConsumers.next().getName());
+
+      assertTrue(queueConsumers.hasNext());
+      assertEquals("E", queueConsumers.next().getName());
+
+      //We have iterated all.
+      assertFalse(queueConsumers.hasNext());
+
+      //Reset to iterate again.
+      queueConsumers.reset();
+
+      //We expect the iteration to round robin from last returned at the level.
+      assertTrue(queueConsumers.hasNext());
+      assertEquals("B", queueConsumers.next().getName());
+
+
+   }
+
+
+
+
+
+   private class TestPriority implements PriorityAware {
+
+      private final int priority;
+      private final String name;
+
+      private TestPriority(String name, int priority) {
+         this.priority = priority;
+         this.name = name;
+      }
+
+      @Override
+      public int getPriority() {
+         return priority;
+      }
+
+      public String getName() {
+         return name;
+      }
+   }
+
+}
\ No newline at end of file
index d86ed85..0aa3085 100644 (file)
@@ -41,6 +41,7 @@
 * [Last-Value Queues](last-value-queues.md)
 * [Exclusive Queues](exclusive-queues.md)
 * [Message Grouping](message-grouping.md)
+* [Consumer Priority](consumer-priority.md)
 * [Extra Acknowledge Modes](pre-acknowledge.md)
 * [Management](management.md)
 * [Management Console](management-console.md)
diff --git a/docs/user-manual/en/consumer-priority.md b/docs/user-manual/en/consumer-priority.md
new file mode 100644 (file)
index 0000000..a7a5ca7
--- /dev/null
@@ -0,0 +1,45 @@
+# Consumer Priority
+
+Consumer priorities allow you to ensure that high priority consumers receive messages while they are active.
+
+Normally, active consumers connected to a queue receive messages from it in a round-robin fashion. When consumer priorities are in use, messages are delivered round-robin if multiple active consumers exist with the same high priority.
+
+Messages will only going to lower priority consumers when the high priority consumers do not have credit available to consume the message, or those high priority consumers have declined to accept the message (for instance because it does not meet the criteria of any selectors associated with the consumer).
+
+Where a consumer does not set, the default priority <b>0</b> is used.
+
+## Core 
+
+#### JMS Example
+
+
+When using the JMS Client you can set the priority to be used, by using address parameters when 
+creating the destination used by the consumer.
+
+```java
+Queue queue = session.createQueue("my.destination.name?consmer-priority=50");
+Topic topic = session.createTopic("my.destination.name?consmer-priority=50");
+
+consumer = session.createConsumer(queue);
+```
+
+The range of priority values is -2<sup>31</sup> to 2<sup>31</sup>-1.
+
+## OpenWire 
+
+####JMS Example
+
+The priority for a consumer is set using Destination Options as follows:
+
+```java
+queue = new ActiveMQQueue("TEST.QUEUE?consumer.priority=10");
+consumer = session.createConsumer(queue);
+```
+
+Because of the limitation of OpenWire, the range of priority values is: 0 to 127. The highest priority is 127.
+
+## AMQP
+
+In AMQP 1.0 the priority of the consumer is set in the properties map of the attach frame where the broker side of the link represents the sending side of the link. 
+
+The key for the entry must be the literal string priority, and the value of the entry must be an integral number in the range -2<sup>31</sup> to 2<sup>31</sup>-1.
\ No newline at end of file
index fb4e4da..dc83ef7 100644 (file)
@@ -79,6 +79,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
    private String selector;
    private boolean presettle;
    private boolean noLocal;
+   private Map<Symbol, Object> properties;
 
    private AsyncResult pullRequest;
    private AsyncResult stopRequest;
@@ -173,6 +174,14 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
       }
    }
 
+   public void setProperties(Map<Symbol, Object> properties) {
+      if (getEndpoint() != null) {
+         throw new IllegalStateException("Endpoint already established");
+      }
+
+      this.properties = properties;
+   }
+
    /**
     * Detach the receiver, a closed receiver will throw exceptions if any further send calls are
     * made.
@@ -782,7 +791,9 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
       } else {
          receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
       }
-
+      if (properties != null) {
+         receiver.setProperties(properties);
+      }
       setEndpoint(receiver);
 
       super.doOpen();
index 8c331ca..53d45e3 100644 (file)
@@ -306,6 +306,12 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
       return createReceiver(address, selector, noLocal, false);
    }
 
+   public AmqpReceiver createReceiver(String address,
+                                      String selector,
+                                      boolean noLocal,
+                                      boolean presettle) throws Exception {
+      return createReceiver(address, selector, noLocal, presettle, null);
+   }
    /**
     * Create a receiver instance using the given address
     *
@@ -313,13 +319,15 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
     * @param selector  the JMS selector to use for the subscription
     * @param noLocal   should the subscription have messages from its connection filtered.
     * @param presettle should the receiver be created with a settled sender mode.
+    * @param properties to set on the receiver
     * @return a newly created receiver that is ready for use.
     * @throws Exception if an error occurs while creating the receiver.
     */
    public AmqpReceiver createReceiver(String address,
                                       String selector,
                                       boolean noLocal,
-                                      boolean presettle) throws Exception {
+                                      boolean presettle,
+                                      Map<Symbol, Object> properties) throws Exception {
       checkClosed();
 
       final ClientFuture request = new ClientFuture();
@@ -330,6 +338,9 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
       if (selector != null && !selector.isEmpty()) {
          receiver.setSelector(selector);
       }
+      if (properties != null && !properties.isEmpty()) {
+         receiver.setProperties(properties);
+      }
 
       connection.getScheduler().execute(new Runnable() {
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
new file mode 100644 (file)
index 0000000..95011d3
--- /dev/null
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 30000)
+   public void testPriority() throws Exception {
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      Map<Symbol, Object> properties1 = new HashMap<>();
+      properties1.put(Symbol.getSymbol("priority"), 5);
+      AmqpReceiver receiver1 = session.createReceiver(getQueueName(), null, false, false, properties1);
+      receiver1.flow(100);
+
+      Map<Symbol, Object> properties2 = new HashMap<>();
+      properties2.put(Symbol.getSymbol("priority"), 50);
+      AmqpReceiver receiver2 = session.createReceiver(getQueueName(), null, false, false, properties2);
+      receiver2.flow(100);
+
+      Map<Symbol, Object> properties3 = new HashMap<>();
+      properties3.put(Symbol.getSymbol("priority"), 10);
+      AmqpReceiver receiver3 = session.createReceiver(getQueueName(), null, false, false, properties3);
+      receiver3.flow(100);
+
+      sendMessages(getQueueName(), 5);
+
+
+      for (int i = 0; i < 5; i++) {
+         AmqpMessage message1 = receiver1.receiveNoWait();
+         AmqpMessage message2 = receiver2.receive(250, TimeUnit.MILLISECONDS);
+         AmqpMessage message3 = receiver3.receiveNoWait();
+         assertNotNull("did not receive message first time", message2);
+         assertEquals("MessageID:" + i, message2.getMessageId());
+         message2.accept();
+         assertNull("message is not meant to goto lower priority receiver", message1);
+         assertNull("message is not meant to goto lower priority receiver", message3);
+      }
+      assertNoMessage(receiver1);
+      assertNoMessage(receiver3);
+
+      //Close the high priority receiver
+      receiver2.close();
+
+      sendMessages(getQueueName(), 5);
+
+      //Check messages now goto next priority receiver
+      for (int i = 0; i < 5; i++) {
+         AmqpMessage message1 = receiver1.receiveNoWait();
+         AmqpMessage message3 = receiver3.receive(250, TimeUnit.MILLISECONDS);
+         assertNotNull("did not receive message first time", message3);
+         assertEquals("MessageID:" + i, message3.getMessageId());
+         message3.accept();
+         assertNull("message is not meant to goto lower priority receiver", message1);
+      }
+      assertNoMessage(receiver1);
+
+
+      connection.close();
+   }
+
+   public void assertNoMessage(AmqpReceiver receiver) throws Exception {
+      //A check to make sure no messages
+      AmqpMessage message = receiver.receive(250, TimeUnit.MILLISECONDS);
+      assertNull("message is not meant to goto lower priority receiver", message);
+   }
+
+   @Test(timeout = 30000)
+   public void testPriorityProvidedAsByte() throws Exception {
+      testPriorityNumber((byte) 5);
+   }
+
+   @Test(timeout = 30000)
+   public void testPriorityProvidedAsUnsignedInteger() throws Exception {
+      testPriorityNumber(UnsignedInteger.valueOf(5));
+   }
+
+
+   private void testPriorityNumber(Number number) throws Exception {
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      Map<Symbol, Object> properties1 = new HashMap<>();
+      properties1.put(Symbol.getSymbol("priority"), number);
+      AmqpReceiver receiver1 = session.createReceiver(getQueueName(), null, false, false, properties1);
+      receiver1.flow(100);
+
+      sendMessages(getQueueName(), 2);
+
+
+      for (int i = 0; i < 2; i++) {
+         AmqpMessage message1 = receiver1.receiveNoWait();
+         assertNotNull("did not receive message first time", message1);
+         assertEquals("MessageID:" + i, message1.getMessageId());
+         message1.accept();
+      }
+      connection.close();
+   }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerPriorityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerPriorityTest.java
new file mode 100644 (file)
index 0000000..47b9f56
--- /dev/null
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.client;
+
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+/**
+ * Consumer Priority Test
+ */
+public class ConsumerPriorityTest extends JMSTestBase {
+
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+   }
+
+
+   protected ConnectionFactory getCF() throws Exception {
+      return cf;
+   }
+
+   @Test
+   public void testConsumerPriorityQueueConsumerSettingUsingAddressQueueParameters() throws Exception {
+      ConnectionFactory fact = getCF();
+      Connection connection = fact.createConnection();
+      String queueName = getName();
+
+      try {
+
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         Queue queue = session.createQueue(queueName);
+         Queue queue1 = session.createQueue(queueName + "?consumer-priority=3");
+         Queue queue2 = session.createQueue(queueName + "?consumer-priority=2");
+         Queue queue3 = session.createQueue(queueName + "?consumer-priority=1");
+
+         assertEquals(queueName, queue.getQueueName());
+
+         ActiveMQDestination b = (ActiveMQDestination) queue1;
+         assertEquals(3, b.getQueueAttributes().getConsumerPriority().intValue());
+         ActiveMQDestination c = (ActiveMQDestination) queue2;
+         assertEquals(2, c.getQueueAttributes().getConsumerPriority().intValue());
+         ActiveMQDestination d = (ActiveMQDestination) queue3;
+         assertEquals(1, d.getQueueAttributes().getConsumerPriority().intValue());
+
+         MessageProducer producer = session.createProducer(queue);
+
+         MessageConsumer consumer1 = session.createConsumer(queue1);
+         MessageConsumer consumer2 = session.createConsumer(queue2);
+         MessageConsumer consumer3 = session.createConsumer(queue3);
+
+         connection.start();
+
+         for (int j = 0; j < 100; j++) {
+            TextMessage message = session.createTextMessage();
+
+            message.setText("Message" + j);
+
+            producer.send(message);
+         }
+
+
+         //All msgs should go to the first consumer
+         for (int j = 0; j < 100; j++) {
+            TextMessage tm = (TextMessage) consumer1.receive(10000);
+
+            assertNotNull(tm);
+
+            assertEquals("Message" + j, tm.getText());
+
+            tm = (TextMessage) consumer2.receiveNoWait();
+            assertNull(tm);
+            tm = (TextMessage) consumer3.receiveNoWait();
+            assertNull(tm);
+         }
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test
+   public void testConsumerPriorityQueueConsumerRoundRobin() throws Exception {
+      ConnectionFactory fact = getCF();
+      Connection connection = fact.createConnection();
+      String queueName = getName();
+
+      try {
+
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         Queue queue = session.createQueue(queueName);
+         Queue queue1 = session.createQueue(queueName + "?consumer-priority=3");
+         Queue queue2 = session.createQueue(queueName + "?consumer-priority=3");
+         Queue queue3 = session.createQueue(queueName + "?consumer-priority=1");
+
+
+         MessageProducer producer = session.createProducer(queue);
+
+         MessageConsumer consumer1 = session.createConsumer(queue1);
+         MessageConsumer consumer2 = session.createConsumer(queue2);
+         MessageConsumer consumer3 = session.createConsumer(queue3);
+
+         connection.start();
+
+         for (int j = 0; j < 100; j++) {
+            TextMessage message = session.createTextMessage();
+
+            message.setText("Message" + j);
+            message.setIntProperty("counter", j);
+            producer.send(message);
+         }
+
+
+         //All msgs should go to the first two consumers, round robin'd
+         for (int j = 0; j < 50; j += 2) {
+            TextMessage tm = (TextMessage) consumer1.receive(10000);
+
+            assertNotNull(tm);
+
+            assertEquals("Message" + j, tm.getText());
+
+            TextMessage tm2 = (TextMessage) consumer2.receive(10000);
+            assertNotNull(tm2);
+
+            assertEquals("Message" + (j + 1), tm2.getText());
+
+
+
+            TextMessage tm3 = (TextMessage) consumer3.receiveNoWait();
+            assertNull(tm3);
+         }
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test
+   public void testConsumerPriorityQueueConsumerFailover() throws Exception {
+      ConnectionFactory fact = getCF();
+      Connection connection = fact.createConnection();
+      String queueName = getName();
+
+      try {
+
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         Queue queue = session.createQueue(queueName);
+         Queue queue1 = session.createQueue(queueName + "?consumer-priority=3");
+         Queue queue2 = session.createQueue(queueName + "?consumer-priority=2");
+         Queue queue3 = session.createQueue(queueName + "?consumer-priority=1");
+
+
+         MessageProducer producer = session.createProducer(queue);
+
+         MessageConsumer consumer1 = session.createConsumer(queue1);
+         MessageConsumer consumer2 = session.createConsumer(queue2);
+         MessageConsumer consumer3 = session.createConsumer(queue3);
+
+         connection.start();
+
+         for (int j = 0; j < 100; j++) {
+            TextMessage message = session.createTextMessage();
+
+            message.setText("Message" + j);
+
+            producer.send(message);
+         }
+
+
+         //All msgs should go to the first consumer
+         for (int j = 0; j < 50; j++) {
+            TextMessage tm = (TextMessage) consumer1.receive(10000);
+
+            assertNotNull(tm);
+
+            assertEquals("Message" + j, tm.getText());
+
+            tm = (TextMessage) consumer2.receiveNoWait();
+            assertNull(tm);
+            tm = (TextMessage) consumer3.receiveNoWait();
+            assertNull(tm);
+         }
+         consumer1.close();
+
+         //All msgs should now go to the next consumer only, without any errors or exceptions
+         for (int j = 50; j < 100; j++) {
+            TextMessage tm = (TextMessage) consumer2.receive(10000);
+
+            assertNotNull(tm);
+
+            assertEquals("Message" + j, tm.getText());
+
+            tm = (TextMessage) consumer3.receiveNoWait();
+            assertNull(tm);
+         }
+      } finally {
+         connection.close();
+      }
+   }
+
+
+   @Test
+   public void testConsumerPriorityTopicSharedConsumerFailover() throws Exception {
+      ConnectionFactory fact = getCF();
+      Connection connection = fact.createConnection();
+      String topicName = getName();
+      try {
+
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         Destination topic = session.createTopic(topicName);
+         MessageProducer producer = session.createProducer(topic);
+
+         String subscriptionName = "sharedsub";
+         Topic topicConsumer1 = session.createTopic(topicName + "?consumer-priority=3");
+         Topic topicConsumer2 = session.createTopic(topicName + "?consumer-priority=2");
+         Topic topicConsumer3 = session.createTopic(topicName + "?consumer-priority=1");
+
+
+         MessageConsumer consumer1 = session.createSharedDurableConsumer(topicConsumer1, subscriptionName);
+         MessageConsumer consumer2 = session.createSharedDurableConsumer(topicConsumer2, subscriptionName);
+         MessageConsumer consumer3 = session.createSharedDurableConsumer(topicConsumer3, subscriptionName);
+
+         connection.start();
+
+         for (int j = 0; j < 100; j++) {
+            TextMessage message = session.createTextMessage();
+
+            message.setText("Message" + j);
+
+            producer.send(message);
+         }
+
+
+         //All msgs should go to the first consumer
+         for (int j = 0; j < 50; j++) {
+            TextMessage tm = (TextMessage) consumer1.receive(10000);
+
+            assertNotNull(tm);
+
+            assertEquals("Message" + j, tm.getText());
+
+            tm = (TextMessage) consumer2.receiveNoWait();
+            assertNull(tm);
+            tm = (TextMessage) consumer3.receiveNoWait();
+            assertNull(tm);
+         }
+         consumer1.close();
+
+         //All msgs should now go to the next consumer only, without any errors or exceptions
+         for (int j = 50; j < 100; j++) {
+            TextMessage tm = (TextMessage) consumer2.receive(10000);
+
+            assertNotNull(tm);
+
+            assertEquals("Message" + j, tm.getText());
+
+            tm = (TextMessage) consumer3.receiveNoWait();
+            assertNull(tm);
+         }
+
+
+      } finally {
+         connection.close();
+      }
+   }
+
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/QueueConsumerPriorityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/QueueConsumerPriorityTest.java
new file mode 100644 (file)
index 0000000..637e9d6
--- /dev/null
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire.amq;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+
+import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.Before;
+import org.junit.Test;
+
+public class QueueConsumerPriorityTest extends BasicOpenWireTest {
+
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      this.makeSureCoreQueueExist("QUEUE.A");
+   }
+   @Test
+   public void testQueueConsumerPriority() throws JMSException, InterruptedException {
+      connection.start();
+      Session consumerLowPriority = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Session consumerHighPriority = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      assertNotNull(consumerHighPriority);
+      Session senderSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      String queueName = "QUEUE.A";
+      ActiveMQQueue low = new ActiveMQQueue(queueName + "?consumer.priority=1");
+      MessageConsumer lowConsumer = consumerLowPriority.createConsumer(low);
+
+      ActiveMQQueue high = new ActiveMQQueue(queueName + "?consumer.priority=2");
+      MessageConsumer highConsumer = consumerLowPriority.createConsumer(high);
+
+      ActiveMQQueue senderQueue = new ActiveMQQueue(queueName);
+
+      MessageProducer producer = senderSession.createProducer(senderQueue);
+
+      Message msg = senderSession.createTextMessage("test");
+      for (int i = 0; i < 1000; i++) {
+         producer.send(msg);
+         assertNotNull("null on iteration: " + i, highConsumer.receive(1000));
+      }
+      assertNull(lowConsumer.receive(250));
+   }
+}
index 7a91c21..ecd0f78 100644 (file)
@@ -1175,6 +1175,11 @@ public class MessageHeaderTest extends MessageHeaderTestBase {
       }
 
       @Override
+      public ClientConsumer createConsumer(SimpleString queueName, SimpleString filter, int priority, boolean browseOnly) throws ActiveMQException {
+         return null;
+      }
+
+      @Override
       public ClientConsumer createConsumer(final SimpleString queueName,
                                            final SimpleString filterString,
                                            final int windowSize,
@@ -1184,6 +1189,11 @@ public class MessageHeaderTest extends MessageHeaderTestBase {
       }
 
       @Override
+      public ClientConsumer createConsumer(SimpleString queueName, SimpleString filter, int priority, int windowSize, int maxRate, boolean browseOnly) throws ActiveMQException {
+         return null;
+      }
+
+      @Override
       public ClientConsumer createConsumer(final String queueName) throws ActiveMQException {
          return null;
       }
index a68382f..a9a39c1 100644 (file)
@@ -746,6 +746,11 @@ public class LargeMessageBufferTest extends ActiveMQTestBase {
          return null;
       }
 
+      @Override
+      public int getPriority() {
+         return 0;
+      }
+
       public long getID() {
 
          return 0;