NIFI-5944: When components are started on NiFi startup, if they are invalid, don...
authorMark Payne <markap14@hotmail.com>
Thu, 10 Jan 2019 14:53:05 +0000 (09:53 -0500)
committerMatt Gilman <matt.c.gilman@gmail.com>
Thu, 10 Jan 2019 16:15:17 +0000 (11:15 -0500)
This closes #3259

nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java

index f3ae41f..e17682e 100644 (file)
@@ -373,9 +373,8 @@ public abstract class AbstractComponentNode implements ComponentNode {
     }
 
     @Override
-    public final void performValidation() {
-        boolean replaced = false;
-        do {
+    public final ValidationStatus performValidation() {
+        while (true) {
             final ValidationState validationState = getValidationState();
 
             final ValidationContext validationContext = getValidationContext();
@@ -391,8 +390,11 @@ public abstract class AbstractComponentNode implements ComponentNode {
 
             final ValidationStatus status = results.isEmpty() ? ValidationStatus.VALID : ValidationStatus.INVALID;
             final ValidationState updatedState = new ValidationState(status, results);
-            replaced = replaceValidationState(validationState, updatedState);
-        } while (!replaced);
+            final boolean replaced = replaceValidationState(validationState, updatedState);
+            if (replaced) {
+                return status;
+            }
+        }
     }
 
     protected Collection<ValidationResult> computeValidationErrors(final ValidationContext validationContext) {
index d0ed572..2357d41 100644 (file)
@@ -179,7 +179,7 @@ public interface ComponentNode extends ComponentAuthorizable {
     /**
      * Asynchronously begins the validation process
      */
-    public abstract void performValidation();
+    public abstract ValidationStatus performValidation();
 
     /**
      * Returns a {@link List} of all {@link PropertyDescriptor}s that this
index 6e8206e..12eeb88 100644 (file)
@@ -17,6 +17,7 @@
 package org.apache.nifi.controller;
 
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.controller.scheduling.LifecycleState;
@@ -144,7 +145,13 @@ public abstract class ProcessorNode extends AbstractComponentNode implements Con
     public ScheduledState getScheduledState() {
         ScheduledState sc = this.scheduledState.get();
         if (sc == ScheduledState.STARTING) {
-            return ScheduledState.RUNNING;
+            final ValidationStatus validationStatus = getValidationStatus();
+
+            if (validationStatus == ValidationStatus.INVALID) {
+                return ScheduledState.STOPPED;
+            } else {
+                return ScheduledState.RUNNING;
+            }
         } else if (sc == ScheduledState.STOPPING) {
             return ScheduledState.STOPPED;
         }
@@ -240,4 +247,12 @@ public abstract class ProcessorNode extends AbstractComponentNode implements Con
      * will result in the WARN message if processor can not be enabled.
      */
     public abstract void disable();
+
+    /**
+     * Returns the Scheduled State that is desired for this Processor. This may vary from the current state if the Processor is not
+     * currently valid, is in the process of stopping but should then transition to Running, etc.
+     *
+     * @return the desired state for this Processor
+     */
+    public abstract ScheduledState getDesiredState();
 }
index dad01b8..8ab7e69 100644 (file)
@@ -912,7 +912,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
 
                     try {
                         if (connectable instanceof ProcessorNode) {
-                            ((ProcessorNode) connectable).getValidationStatus(5, TimeUnit.SECONDS);
                             connectable.getProcessGroup().startProcessor((ProcessorNode) connectable, true);
                         } else {
                             startConnectable(connectable);
@@ -1242,7 +1241,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
                         return ScheduledState.RUNNING;
                     }
 
-                    return procNode.getScheduledState();
+                    return procNode.getDesiredState();
                 }
 
                 @Override
@@ -1699,7 +1698,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
             throw new IllegalStateException("Cannot start reporting task " + reportingTaskNode.getIdentifier() + " because the controller is terminated");
         }
 
-        reportingTaskNode.performValidation(); // ensure that the reporting task has completed its validation before attempting to start it
         reportingTaskNode.verifyCanStart();
         reportingTaskNode.reloadAdditionalResourcesIfNecessary();
         processScheduler.schedule(reportingTaskNode);
index 4adfad4..8ca5173 100644 (file)
@@ -38,6 +38,7 @@ import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.validation.ValidationState;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.ConnectableType;
@@ -139,7 +140,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
     private final ProcessScheduler processScheduler;
     private long runNanos = 0L;
     private volatile long yieldNanos;
-    private volatile ScheduledState desiredState;
+    private volatile ScheduledState desiredState = ScheduledState.STOPPED;
     private volatile LogLevel bulletinLevel = LogLevel.WARN;
 
     private SchedulingStrategy schedulingStrategy; // guarded by read/write lock
@@ -1343,13 +1344,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
     public void start(final ScheduledExecutorService taskScheduler, final long administrativeYieldMillis, final long timeoutMillis, final ProcessContext processContext,
             final SchedulingAgentCallback schedulingAgentCallback, final boolean failIfStopping) {
 
-        switch (getValidationStatus()) {
-            case INVALID:
-                throw new IllegalStateException("Processor " + this.getName() + " is not in a valid state due to " + this.getValidationErrors());
-            case VALIDATING:
-                throw new IllegalStateException("Processor " + this.getName() + " cannot be started because its validation is still being performed");
-        }
-
         final Processor processor = processorRef.get().getProcessor();
         final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
 
@@ -1487,6 +1481,25 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
 
         // Create a task to invoke the @OnScheduled annotation of the processor
         final Callable<Void> startupTask = () -> {
+            final ScheduledState currentScheduleState = scheduledState.get();
+            if (currentScheduleState == ScheduledState.STOPPING || currentScheduleState == ScheduledState.STOPPED) {
+                LOG.debug("{} is stopped. Will not call @OnScheduled lifecycle methods or begin trigger onTrigger() method", StandardProcessorNode.this);
+                schedulingAgentCallback.onTaskComplete();
+                return null;
+            }
+
+            final ValidationStatus validationStatus = getValidationStatus();
+            if (validationStatus != ValidationStatus.VALID) {
+                LOG.debug("Cannot start {} because Processor is currently not valid; will try again after 5 seconds", StandardProcessorNode.this);
+
+                // re-initiate the entire process
+                final Runnable initiateStartTask = () -> initiateStart(taskScheduler, administrativeYieldMillis, timeoutMilis, processContext, schedulingAgentCallback);
+                taskScheduler.schedule(initiateStartTask, 5, TimeUnit.SECONDS);
+
+                schedulingAgentCallback.onTaskComplete();
+                return null;
+            }
+
             LOG.debug("Invoking @OnScheduled methods of {}", processor);
 
             // Now that the task has been scheduled, set the timeout
@@ -1696,6 +1709,10 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
         return future;
     }
 
+    @Override
+    public ScheduledState getDesiredState() {
+        return desiredState;
+    }
 
     private void monitorAsyncTask(final Future<?> taskFuture, final Future<?> monitoringFuture, final long completionTimestamp) {
         if (taskFuture.isDone()) {
index bce85f8..f1b585e 100644 (file)
  */
 package org.apache.nifi.controller.reporting;
 
-import java.net.URL;
-import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.nifi.annotation.configuration.DefaultSchedule;
 import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.controller.AbstractComponentNode;
 import org.apache.nifi.controller.ConfigurationContext;
@@ -51,6 +46,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.core.annotation.AnnotationUtils;
 
+import java.net.URL;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
 public abstract class AbstractReportingTaskNode extends AbstractComponentNode implements ReportingTaskNode {
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractReportingTaskNode.class);
@@ -176,7 +177,7 @@ public abstract class AbstractReportingTaskNode extends AbstractComponentNode im
 
     @Override
     public boolean isValidationNecessary() {
-        return !processScheduler.isScheduled(this);
+        return !processScheduler.isScheduled(this) || getValidationStatus() != ValidationStatus.VALID;
     }
 
     @Override
index 2ff3307..a7d5fd8 100644 (file)
@@ -21,6 +21,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Funnel;
 import org.apache.nifi.connectable.Port;
@@ -186,13 +187,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
             throw new IllegalStateException("Reporting Task " + taskNode.getName() + " cannot be started because it has " + activeThreadCount + " threads still running");
         }
 
-        switch (taskNode.getValidationStatus()) {
-            case INVALID:
-                throw new IllegalStateException("Reporting Task " + taskNode.getName() + " is not in a valid state for the following reasons: " + taskNode.getValidationErrors());
-            case VALIDATING:
-                throw new IllegalStateException("Reporting Task " + taskNode.getName() + " cannot be scheduled because it is in the process of validating its configuration");
-        }
-
         final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy());
         lifecycleState.setScheduled(true);
 
@@ -216,6 +210,13 @@ public final class StandardProcessScheduler implements ProcessScheduler {
                             return;
                         }
 
+                        final ValidationStatus validationStatus = taskNode.getValidationStatus();
+                        if (validationStatus != ValidationStatus.VALID) {
+                            LOG.debug("Cannot schedule {} to run because it is currently invalid. Will try again in 5 seconds", taskNode);
+                            componentLifeCycleThreadPool.schedule(this, 5, TimeUnit.SECONDS);
+                            return;
+                        }
+
                         try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), reportingTask.getClass(), reportingTask.getIdentifier())) {
                             ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, reportingTask, taskNode.getConfigurationContext());
                         }
@@ -231,8 +232,11 @@ public final class StandardProcessScheduler implements ProcessScheduler {
                             + "ReportingTask and will attempt to schedule it again after {}",
                             new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
 
-                    ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, taskNode.getConfigurationContext());
-                    ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, reportingTask, taskNode.getConfigurationContext());
+
+                    try (final NarCloseable x = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), reportingTask.getClass(), reportingTask.getIdentifier())) {
+                        ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, taskNode.getConfigurationContext());
+                        ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, reportingTask, taskNode.getConfigurationContext());
+                    }
 
                     componentLifeCycleThreadPool.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS);
                 }
index 39693b8..4a6b420 100644 (file)
@@ -30,7 +30,7 @@ public interface ScheduledStateLookup {
     public static final ScheduledStateLookup IDENTITY_LOOKUP = new ScheduledStateLookup() {
         @Override
         public ScheduledState getScheduledState(final ProcessorNode procNode) {
-            return procNode.getScheduledState();
+            return procNode.getDesiredState();
         }
 
         @Override
index 795fc8c..3d6329f 100644 (file)
  */
 package org.apache.nifi.controller.service;
 
-import java.lang.reflect.InvocationTargetException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.Restricted;
 import org.apache.nifi.annotation.documentation.DeprecationNotice;
@@ -47,8 +28,7 @@ import org.apache.nifi.authorization.resource.ResourceType;
 import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.ConfigurableComponent;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.validation.ValidationState;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.controller.AbstractComponentNode;
 import org.apache.nifi.controller.ComponentNode;
@@ -71,6 +51,24 @@ import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 public class StandardControllerServiceNode extends AbstractComponentNode implements ControllerServiceNode {
 
     private static final Logger LOG = LoggerFactory.getLogger(StandardControllerServiceNode.class);
@@ -319,14 +317,6 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
         if (getState() != ControllerServiceState.DISABLED) {
             throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not disabled");
         }
-
-        final ValidationState validationState = getValidationState();
-        switch (validationState.getStatus()) {
-            case INVALID:
-                throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not valid: " + validationState.getValidationErrors());
-            case VALIDATING:
-                throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because its validation has not yet completed");
-        }
     }
 
     @Override
@@ -334,11 +324,6 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
         if (getState() != ControllerServiceState.DISABLED) {
             throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not disabled");
         }
-
-        final Collection<ValidationResult> validationErrors = getValidationErrors(ignoredReferences);
-        if (ignoredReferences != null && !validationErrors.isEmpty()) {
-            throw new IllegalStateException("Controller Service with ID " + getIdentifier() + " cannot be enabled because it is not currently valid: " + validationErrors);
-        }
     }
 
     @Override
@@ -389,8 +374,11 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
             case DISABLED:
             case DISABLING:
                 return true;
-            case ENABLED:
             case ENABLING:
+                // If enabling and currently not valid, then we must trigger validation to occur. This allows the #enable method
+                // to continue running in the background and complete enabling when the service becomes valid.
+                return getValidationStatus() != ValidationStatus.VALID;
+            case ENABLED:
             default:
                 return false;
         }
@@ -398,7 +386,7 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
 
     /**
      * Will atomically enable this service by invoking its @OnEnabled operation.
-     * It uses CAS operation on {@link #stateRef} to transition this service
+     * It uses CAS operation on {@link #stateTransition} to transition this service
      * from DISABLED to ENABLING state. If such transition succeeds the service
      * will be marked as 'active' (see {@link ControllerServiceNode#isActive()}).
      * If such transition doesn't succeed then no enabling logic will be
@@ -429,6 +417,20 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
             scheduler.execute(new Runnable() {
                 @Override
                 public void run() {
+                    if (!isActive()) {
+                        LOG.debug("{} is no longer active so will not attempt to enable it", StandardControllerServiceNode.this);
+                        stateTransition.disable();
+                        return;
+                    }
+
+                    final ValidationStatus validationStatus = getValidationStatus();
+                    if (validationStatus != ValidationStatus.VALID) {
+                        LOG.debug("Cannot enable {} because it is not currently valid. Will try again in 5 seconds", StandardControllerServiceNode.this);
+                        scheduler.schedule(this, 5, TimeUnit.SECONDS);
+                        future.completeExceptionally(new RuntimeException(this + " cannot be enabled because it is not currently valid. Will try again in 5 seconds."));
+                        return;
+                    }
+
                     try {
                         try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), getControllerServiceImplementation().getClass(), getIdentifier())) {
                             ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, getControllerServiceImplementation(), configContext);
@@ -446,7 +448,7 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
                             invokeDisable(configContext);
                             stateTransition.disable();
                         } else {
-                            LOG.debug("Successfully enabled {}", service);
+                            LOG.info("Successfully enabled {}", service);
                         }
                     } catch (Exception e) {
                         future.completeExceptionally(e);
@@ -478,7 +480,7 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
 
     /**
      * Will atomically disable this service by invoking its @OnDisabled operation.
-     * It uses CAS operation on {@link #stateRef} to transition this service
+     * It uses CAS operation on {@link #stateTransition} to transition this service
      * from ENABLED to DISABLING state. If such transition succeeds the service
      * will be de-activated (see {@link ControllerServiceNode#isActive()}).
      * If such transition doesn't succeed (the service is still in ENABLING state)
index 920999e..242c0ad 100644 (file)
@@ -24,6 +24,7 @@ import org.apache.nifi.bundle.Bundle;
 import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
@@ -92,6 +93,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -342,6 +344,8 @@ public class TestStandardProcessScheduler {
         final ControllerServiceNode serviceNode = flowManager.createControllerService(SimpleTestService.class.getName(),
                 "1", systemBundle.getBundleDetails().getCoordinate(), null, false, true);
 
+        serviceNode.performValidation();
+
         assertFalse(serviceNode.isActive());
         final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
         final ExecutorService executor = Executors.newCachedThreadPool();
@@ -361,10 +365,10 @@ public class TestStandardProcessScheduler {
                 }
             });
         }
-        // need to sleep a while since we are emulating async invocations on
-        // method that is also internally async
-        Thread.sleep(500);
+
         executor.shutdown();
+        executor.awaitTermination(10, TimeUnit.SECONDS);
+
         assertFalse(asyncFailed.get());
         assertEquals(1, ts.enableInvocationCount());
     }
@@ -399,10 +403,9 @@ public class TestStandardProcessScheduler {
                 }
             });
         }
-        // need to sleep a while since we are emulating async invocations on
-        // method that is also internally async
-        Thread.sleep(500);
+
         executor.shutdown();
+        executor.awaitTermination(10, TimeUnit.SECONDS);
         assertFalse(asyncFailed.get());
         assertEquals(0, ts.disableInvocationCount());
     }
@@ -419,8 +422,10 @@ public class TestStandardProcessScheduler {
         final ControllerServiceNode serviceNode = flowManager.createControllerService(SimpleTestService.class.getName(),
                 "1", systemBundle.getBundleDetails().getCoordinate(), null, false, true);
 
+        assertSame(ValidationStatus.VALID, serviceNode.performValidation());
+
         final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation();
-        scheduler.enableControllerService(serviceNode);
+        scheduler.enableControllerService(serviceNode).get();
         assertTrue(serviceNode.isActive());
         final ExecutorService executor = Executors.newCachedThreadPool();
 
@@ -441,8 +446,8 @@ public class TestStandardProcessScheduler {
         }
         // need to sleep a while since we are emulating async invocations on
         // method that is also internally async
-        Thread.sleep(500);
         executor.shutdown();
+        executor.awaitTermination(10, TimeUnit.SECONDS); // change to seconds.
         assertFalse(asyncFailed.get());
         assertEquals(1, ts.disableInvocationCount());
     }
@@ -453,9 +458,17 @@ public class TestStandardProcessScheduler {
 
         final ControllerServiceNode serviceNode = flowManager.createControllerService(FailingService.class.getName(),
                 "1", systemBundle.getBundleDetails().getCoordinate(), null, false, true);
-        scheduler.enableControllerService(serviceNode);
-        Thread.sleep(1000);
+        serviceNode.performValidation();
+
+        final Future<?> future = scheduler.enableControllerService(serviceNode);
+        try {
+            future.get();
+        } catch (final Exception e) {
+            // Expected behavior because the FailingService throws Exception when attempting to enable
+        }
+
         scheduler.shutdown();
+
         /*
          * Because it was never disabled it will remain active since its
          * enabling is being retried. This may actually be a bug in the
@@ -528,14 +541,20 @@ public class TestStandardProcessScheduler {
 
         final ControllerServiceNode serviceNode = flowManager.createControllerService(LongEnablingService.class.getName(),
                 "1", systemBundle.getBundleDetails().getCoordinate(), null, false, true);
+
         final LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation();
         ts.setLimit(Long.MAX_VALUE);
+
+        serviceNode.performValidation();
         scheduler.enableControllerService(serviceNode);
-        Thread.sleep(100);
+
         assertTrue(serviceNode.isActive());
+        final long maxTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
+        while (ts.enableInvocationCount() != 1 && System.nanoTime() <= maxTime) {
+            Thread.sleep(1L);
+        }
         assertEquals(1, ts.enableInvocationCount());
 
-        Thread.sleep(1000);
         scheduler.disableControllerService(serviceNode);
         assertFalse(serviceNode.isActive());
         assertEquals(ControllerServiceState.DISABLING, serviceNode.getState());
index daee3c8..ebeb916 100644 (file)
@@ -21,6 +21,7 @@ import org.apache.nifi.bundle.Bundle;
 import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.validation.ValidationStatus;
 import org.apache.nifi.components.validation.ValidationTrigger;
 import org.apache.nifi.controller.ExtensionBuilder;
 import org.apache.nifi.controller.FlowController;
@@ -73,6 +74,7 @@ import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
 public class TestStandardControllerServiceProvider {
@@ -197,8 +199,8 @@ public class TestStandardControllerServiceProvider {
         provider.disableControllerService(serviceNode);
     }
 
-    @Test(timeout = 1000000)
-    public void testEnableDisableWithReference() {
+    @Test(timeout = 10000)
+    public void testEnableDisableWithReference() throws InterruptedException {
         final ProcessGroup group = new MockProcessGroup(controller);
         final FlowController controller = Mockito.mock(FlowController.class);
         final FlowManager flowManager = Mockito.mock(FlowManager.class);
@@ -221,17 +223,24 @@ public class TestStandardControllerServiceProvider {
 
         try {
             provider.enableControllerService(serviceNodeA);
-            Assert.fail("Was able to enable Service A but Service B is disabled.");
         } catch (final IllegalStateException expected) {
         }
 
+        assertSame(ControllerServiceState.ENABLING, serviceNodeA.getState());
+
         serviceNodeB.performValidation();
-        serviceNodeB.getValidationStatus(5, TimeUnit.SECONDS);
+        assertSame(ValidationStatus.VALID, serviceNodeB.getValidationStatus(5, TimeUnit.SECONDS));
         provider.enableControllerService(serviceNodeB);
 
         serviceNodeA.performValidation();
-        serviceNodeA.getValidationStatus(5, TimeUnit.SECONDS);
-        provider.enableControllerService(serviceNodeA);
+        assertSame(ValidationStatus.VALID, serviceNodeA.getValidationStatus(5, TimeUnit.SECONDS));
+
+        final long maxTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
+        // Wait for Service A to become ENABLED. This will happen in a background thread after approximately 5 seconds, now that Service A is valid.
+        while (serviceNodeA.getState() != ControllerServiceState.ENABLED && System.nanoTime() <= maxTime) {
+            Thread.sleep(5L);
+        }
+        assertSame(ControllerServiceState.ENABLED, serviceNodeA.getState());
 
         try {
             provider.disableControllerService(serviceNodeB);