NIFI-5869 Support Reconnection for JMS
authorEd <edward.berezitsky@gmail.com>
Thu, 10 Jan 2019 16:55:29 +0000 (11:55 -0500)
committerMatthew Burgess <mattyb149@apache.org>
Wed, 13 Feb 2019 19:18:05 +0000 (14:18 -0500)
resets worker if it doesn't work anymore for any reason. this will add "reconnect" capabilities. Will solve issues for following use cases:
- authentication changed after successful connection
- JNDI mapping changed and requires recaching.
- JMS server isn't available anymore or restarted.

improved controller reset on exception

Signed-off-by: Matthew Burgess <mattyb149@apache.org>
This closes #3261

nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderDefinition.java
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryProvider.java
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java

index adb94fd..6bab920 100644 (file)
@@ -35,4 +35,12 @@ public interface JMSConnectionFactoryProviderDefinition extends ControllerServic
      */
     ConnectionFactory getConnectionFactory();
 
+    /**
+     * Resets {@link ConnectionFactory}.
+     * Provider should reset {@link ConnectionFactory} only if a copy provided by a client matches
+     * current {@link ConnectionFactory}.
+     * @param cachedFactory - {@link ConnectionFactory} cached by client.
+     */
+    void resetConnectionFactory(ConnectionFactory cachedFactory);
+
 }
index ecb4e7a..781ce65 100644 (file)
@@ -139,6 +139,14 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl
                 .build();
     }
 
+    @Override
+    public void resetConnectionFactory(ConnectionFactory cachedFactory) {
+        if (cachedFactory == connectionFactory) {
+            getLogger().debug("Resetting connection factory");
+            connectionFactory = null;
+        }
+    }
+
     /**
      * @return new instance of {@link ConnectionFactory}
      */
@@ -316,5 +324,4 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl
             return StandardValidators.NON_EMPTY_VALIDATOR.validate(subject, input, context);
         }
     }
-
 }
index a293d84..44d8d99 100644 (file)
@@ -139,6 +139,14 @@ public class JndiJmsConnectionFactoryProvider extends AbstractControllerService
     }
 
     @Override
+    public synchronized void resetConnectionFactory(ConnectionFactory cachedFactory) {
+        if (cachedFactory == connectionFactory) {
+            getLogger().debug("Resetting connection factory");
+            connectionFactory = null;
+        }
+    }
+
+    @Override
     public synchronized ConnectionFactory getConnectionFactory() {
         if (connectionFactory == null) {
             connectionFactory = lookupConnectionFactory();
index 0094eaf..f47cf78 100644 (file)
@@ -158,7 +158,27 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
         try {
             rendezvousWithJms(context, session, worker);
         } finally {
-            workerPool.offer(worker);
+            //in case of exception during worker's connection (consumer or publisher),
+            //an appropriate service is responsible to invalidate the worker.
+            //if worker is not valid anymore, don't put it back into a pool, try to rebuild it first, or discard.
+            //this will be helpful in a situation, when JNDI has changed, or JMS server is not available
+            //and reconnection is required.
+            if (worker == null || !worker.isValid()){
+                getLogger().debug("Worker is invalid. Will try re-create... ");
+                final JMSConnectionFactoryProviderDefinition cfProvider = context.getProperty(CF_SERVICE).asControllerService(JMSConnectionFactoryProviderDefinition.class);
+                try {
+                    // Safe to cast. Method #buildTargetResource(ProcessContext context) sets only CachingConnectionFactory
+                    CachingConnectionFactory currentCF = (CachingConnectionFactory)worker.jmsTemplate.getConnectionFactory();
+                    cfProvider.resetConnectionFactory(currentCF.getTargetConnectionFactory());
+                    worker = buildTargetResource(context);
+                }catch(Exception e) {
+                    getLogger().error("Failed to rebuild:  " + cfProvider);
+                    worker = null;
+                }
+            }
+            if (worker != null) {
+                workerPool.offer(worker);
+            }
         }
     }
 
index 997c6dd..4b149e2 100644 (file)
@@ -188,28 +188,33 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
         final String subscriptionName = context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue();
         final String charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
 
-        consumer.consume(destinationName, durable, shared, subscriptionName, charset, new ConsumerCallback() {
-            @Override
-            public void accept(final JMSResponse response) {
-                if (response == null) {
-                    return;
-                }
+        try {
+            consumer.consume(destinationName, durable, shared, subscriptionName, charset, new ConsumerCallback() {
+                @Override
+                public void accept(final JMSResponse response) {
+                    if (response == null) {
+                        return;
+                    }
 
-                FlowFile flowFile = processSession.create();
-                flowFile = processSession.write(flowFile, out -> out.write(response.getMessageBody()));
+                    FlowFile flowFile = processSession.create();
+                    flowFile = processSession.write(flowFile, out -> out.write(response.getMessageBody()));
 
-                final Map<String, String> jmsHeaders = response.getMessageHeaders();
-                final Map<String, String> jmsProperties = response.getMessageProperties();
+                    final Map<String, String> jmsHeaders = response.getMessageHeaders();
+                    final Map<String, String> jmsProperties = response.getMessageProperties();
 
-                flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession);
-                flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession);
-                flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName);
+                    flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession);
+                    flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession);
+                    flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName);
 
-                processSession.getProvenanceReporter().receive(flowFile, destinationName);
-                processSession.transfer(flowFile, REL_SUCCESS);
-                processSession.commit();
-            }
-        });
+                    processSession.getProvenanceReporter().receive(flowFile, destinationName);
+                    processSession.transfer(flowFile, REL_SUCCESS);
+                    processSession.commit();
+                }
+            });
+        } catch(Exception e) {
+            consumer.setValid(false);
+            throw e; // for backward compatibility with exception handling in flows
+        }
     }
 
     /**
index e6fa1bb..ee4d76d 100644 (file)
@@ -36,6 +36,7 @@ abstract class JMSWorker {
     protected final JmsTemplate jmsTemplate;
     protected final ComponentLog processLog;
     private final CachingConnectionFactory connectionFactory;
+    private boolean isValid = true;
 
 
     /**
@@ -61,4 +62,12 @@ abstract class JMSWorker {
         return this.getClass().getSimpleName() + "[destination:" + this.jmsTemplate.getDefaultDestinationName()
                 + "; pub-sub:" + this.jmsTemplate.isPubSubDomain() + ";]";
     }
+
+    public boolean isValid() {
+        return isValid;
+    }
+
+    public void setValid(boolean isValid) {
+        this.isValid = isValid;
+    }
 }
index a32a895..12451cf 100644 (file)
@@ -123,11 +123,21 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
                 String charset = context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
                 switch (context.getProperty(MESSAGE_BODY).getValue()) {
                     case TEXT_MESSAGE:
-                        publisher.publish(destinationName, this.extractTextMessageBody(flowFile, processSession, charset), flowFile.getAttributes());
+                        try {
+                            publisher.publish(destinationName, this.extractTextMessageBody(flowFile, processSession, charset), flowFile.getAttributes());
+                        } catch(Exception e) {
+                            publisher.setValid(false);
+                            throw e;
+                        }
                         break;
                     case BYTES_MESSAGE:
                     default:
-                        publisher.publish(destinationName, this.extractMessageBody(flowFile, processSession), flowFile.getAttributes());
+                        try {
+                            publisher.publish(destinationName, this.extractMessageBody(flowFile, processSession), flowFile.getAttributes());
+                        } catch(Exception e) {
+                            publisher.setValid(false);
+                            throw e;
+                        }
                         break;
                 }
                 processSession.transfer(flowFile, REL_SUCCESS);
@@ -136,6 +146,7 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
                 processSession.transfer(flowFile, REL_FAILURE);
                 this.getLogger().error("Failed while sending message to JMS via " + publisher, e);
                 context.yield();
+                publisher.setValid(false);
             }
         }
     }