NIFI-5921 - Added property to allow a user to define a timeout on the ConsumeJMS...
authorRyan Whittington <ryan.whittington@gmail.com>
Wed, 2 Jan 2019 16:17:06 +0000 (16:17 +0000)
committerPierre Villard <pierre.villard.fr@gmail.com>
Fri, 4 Jan 2019 09:32:12 +0000 (10:32 +0100)
Co-Authored-By: rwhittington <ryan.whittington@gmail.com>
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>
This closes #3240.

nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java

index ff05f6a..1f8358e 100644 (file)
@@ -48,6 +48,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Consuming JMS processor which upon each invocation of
@@ -128,6 +129,14 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
+    static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
+            .name("Timeout")
+            .description("How long to wait to consume a message from the remote broker before giving up.")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("0 sec")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
@@ -154,6 +163,7 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
         _propertyDescriptors.add(DURABLE_SUBSCRIBER);
         _propertyDescriptors.add(SHARED_SUBSCRIBER);
         _propertyDescriptors.add(SUBSCRIPTION_NAME);
+        _propertyDescriptors.add(TIMEOUT);
         thisPropertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
 
         Set<Relationship> _relationships = new HashSet<>();
@@ -177,8 +187,9 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
         final boolean shared = sharedBoolean == null ? false : sharedBoolean;
         final String subscriptionName = context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue();
         final String charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
+        final long timeout = context.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
 
-        consumer.consume(destinationName, durable, shared, subscriptionName, charset, new ConsumerCallback() {
+        consumer.consume(destinationName, durable, shared, subscriptionName, charset, timeout, new ConsumerCallback() {
             @Override
             public void accept(final JMSResponse response) {
                 if (response == null) {
index a2c73b4..e6e5913 100644 (file)
@@ -80,7 +80,7 @@ final class JMSConsumer extends JMSWorker {
     }
 
 
-    public void consume(final String destinationName, final boolean durable, final boolean shared, final String subscriberName, final String charset,
+    public void consume(final String destinationName, final boolean durable, final boolean shared, final String subscriberName, final String charset, final long timeout,
                         final ConsumerCallback consumerCallback) {
         this.jmsTemplate.execute(new SessionCallback<Void>() {
             @Override
@@ -88,7 +88,7 @@ final class JMSConsumer extends JMSWorker {
 
                 final MessageConsumer msgConsumer = createMessageConsumer(session, destinationName, durable, shared, subscriberName);
                 try {
-                    final Message message = msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
+                    final Message message = msgConsumer.receive(timeout);
                     JMSResponse response = null;
 
                     if (message != null) {
index 7812e71..eed9276 100644 (file)
@@ -113,7 +113,7 @@ public class JMSPublisherConsumerIT {
             });
 
             JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
-            consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
+            consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
                 @Override
                 public void accept(JMSResponse response) {
                     // noop
@@ -143,7 +143,7 @@ public class JMSPublisherConsumerIT {
 
             JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
             final AtomicBoolean callbackInvoked = new AtomicBoolean();
-            consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
+            consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
                 @Override
                 public void accept(JMSResponse response) {
                     callbackInvoked.set(true);
@@ -190,7 +190,7 @@ public class JMSPublisherConsumerIT {
                         JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) consumeTemplate.getConnectionFactory(), consumeTemplate, mock(ComponentLog.class));
 
                         for (int j = 0; j < 1000 && msgCount.get() < 4000; j++) {
-                            consumer.consume(destinationName, false, false, null, "UTF-8", callback);
+                            consumer.consume(destinationName, false, false, null, "UTF-8", 1000, callback);
                         }
                     } finally {
                         ((CachingConnectionFactory) consumeTemplate.getConnectionFactory()).destroy();
@@ -229,7 +229,7 @@ public class JMSPublisherConsumerIT {
             JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
             final AtomicBoolean callbackInvoked = new AtomicBoolean();
             try {
-                consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
+                consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
                     @Override
                     public void accept(JMSResponse response) {
                         callbackInvoked.set(true);
@@ -246,7 +246,7 @@ public class JMSPublisherConsumerIT {
 
             // should receive the same message, but will process it successfully
             while (!callbackInvoked.get()) {
-                consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
+                consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
                     @Override
                     public void accept(JMSResponse response) {
                         if (response == null) {
@@ -265,7 +265,7 @@ public class JMSPublisherConsumerIT {
             // receiving next message and fail again
             try {
                 while (!callbackInvoked.get()) {
-                    consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
+                    consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
                         @Override
                         public void accept(JMSResponse response) {
                             if (response == null) {
@@ -287,7 +287,7 @@ public class JMSPublisherConsumerIT {
             // should receive the same message, but will process it successfully
             try {
                 while (!callbackInvoked.get()) {
-                    consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
+                    consumer.consume(destinationName, false, false, null, "UTF-8", 1000, new ConsumerCallback() {
                         @Override
                         public void accept(JMSResponse response) {
                             if (response == null) {