NIFI-4436: Ensure that on save, we assign a Versioned Component Identifier to inner...
authorMark Payne <markap14@hotmail.com>
Thu, 4 Jan 2018 21:09:02 +0000 (16:09 -0500)
committerBryan Bende <bbende@apache.org>
Mon, 8 Jan 2018 18:10:13 +0000 (13:10 -0500)
NIFI-4436: Fixed a bug that caused a flow not to successfully change version if a connection is added to an existing component and that component is running at time of version change

NIFI-4436: Fixed bug with ordering of controller services being enabled and disabled

NIFI-4436: Fixed bug that prevented local input and output ports from being stopped and started as needed

NIFI-4436: Fixed bugs around referencing controller services that are at a higher level than the versioned flow

NIFI-4436: Ensure that we clear components from FlowController's cache when removed and that they are added to cache when created.

NIFI-4436: Fixed error message coming back if component is invalid when trying to be restarted/re-enabled

NIFI-4436: Addressed issue with children of a removed process group not being considered 'affected components' and as a result not being stopped/disabled/restarted/re-enabled

This closes #2219.

Signed-off-by: Matt Gilman <matt.c.gilman@gmail.com>
16 files changed:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/AffectedComponentDTO.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/ResumeFlowException.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java

index 95024ca..dd7c6c4 100644 (file)
@@ -26,6 +26,8 @@ import java.util.Collection;
 public class AffectedComponentDTO {
     public static final String COMPONENT_TYPE_PROCESSOR = "PROCESSOR";
     public static final String COMPONENT_TYPE_CONTROLLER_SERVICE = "CONTROLLER_SERVICE";
+    public static final String COMPONENT_TYPE_INPUT_PORT = "INPUT_PORT";
+    public static final String COMPONENT_TYPE_OUTPUT_PORT = "OUTPUT_PORT";
     public static final String COMPONENT_TYPE_REMOTE_INPUT_PORT = "REMOTE_INPUT_PORT";
     public static final String COMPONENT_TYPE_REMOTE_OUTPUT_PORT = "REMOTE_OUTPUT_PORT";
 
@@ -58,6 +60,7 @@ public class AffectedComponentDTO {
 
     @ApiModelProperty(value = "The type of this component",
         allowableValues = COMPONENT_TYPE_PROCESSOR + "," + COMPONENT_TYPE_CONTROLLER_SERVICE + ", "
+            + COMPONENT_TYPE_INPUT_PORT + ", " + COMPONENT_TYPE_OUTPUT_PORT + ", "
             + COMPONENT_TYPE_REMOTE_INPUT_PORT + ", " + COMPONENT_TYPE_REMOTE_OUTPUT_PORT)
     public String getReferenceType() {
         return referenceType;
index 010ecdf..ae5416c 100644 (file)
@@ -20,6 +20,7 @@ import java.net.URL;
 import java.util.Collection;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
 
 import org.apache.nifi.annotation.lifecycle.OnAdded;
 import org.apache.nifi.bundle.BundleCoordinate;
@@ -84,6 +85,16 @@ public interface ControllerServiceProvider extends ControllerServiceLookup {
     void enableControllerServices(Collection<ControllerServiceNode> serviceNodes);
 
     /**
+     * Enables the collection of services in the background. If a service in this collection
+     * depends on another service, the service being depended on must either already be enabled
+     * or must be in the collection as well.
+     *
+     * @param serviceNodes the nodes
+     * @return a Future that can be used to cancel the task or wait until it is completed
+     */
+    Future<Void> enableControllerServicesAsync(Collection<ControllerServiceNode> serviceNodes);
+
+    /**
      * Disables the given controller service so that it cannot be used by other
      * components. This allows configuration to be updated or allows service to
      * be removed.
@@ -93,8 +104,17 @@ public interface ControllerServiceProvider extends ControllerServiceLookup {
     CompletableFuture<Void> disableControllerService(ControllerServiceNode serviceNode);
 
     /**
+     * Disables the collection of services in the background. If any of the services given is referenced
+     * by another service, then that other service must either be disabled or be in the given collection.
+     *
+     * @param serviceNodes the nodes the disable
+     * @return a Future that can be used to cancel the task or wait until it is completed
+     */
+    Future<Void> disableControllerServicesAsync(Collection<ControllerServiceNode> serviceNodes);
+
+    /**
      * @return a Set of all Controller Services that exist for this service
-     * provider
+     *         provider
      */
     Set<ControllerServiceNode> getAllControllerServices();
 
index eb4b8b9..88dc11c 100644 (file)
@@ -245,6 +245,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -3610,12 +3611,22 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     }
 
     @Override
+    public Future<Void> enableControllerServicesAsync(final Collection<ControllerServiceNode> serviceNodes) {
+        return controllerServiceProvider.enableControllerServicesAsync(serviceNodes);
+    }
+
+    @Override
     public CompletableFuture<Void> disableControllerService(final ControllerServiceNode serviceNode) {
         serviceNode.verifyCanDisable();
         return controllerServiceProvider.disableControllerService(serviceNode);
     }
 
     @Override
+    public Future<Void> disableControllerServicesAsync(final Collection<ControllerServiceNode> serviceNodes) {
+        return controllerServiceProvider.disableControllerServicesAsync(serviceNodes);
+    }
+
+    @Override
     public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) {
         controllerServiceProvider.verifyCanEnableReferencingServices(serviceNode);
     }
index 48ad849..b4d9e8b 100644 (file)
@@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.ClassUtils;
@@ -50,13 +51,13 @@ import org.apache.nifi.controller.ConfiguredComponent;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.LoggableComponent;
-import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.ValidationContextFactory;
 import org.apache.nifi.controller.exception.ComponentLifeCycleException;
 import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
+import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
 import org.apache.nifi.events.BulletinFactory;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.logging.ComponentLog;
@@ -78,7 +79,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
 
     private static final Logger logger = LoggerFactory.getLogger(StandardControllerServiceProvider.class);
 
-    private final ProcessScheduler processScheduler;
+    private final StandardProcessScheduler processScheduler;
     private final BulletinRepository bulletinRepo;
     private final StateManagerProvider stateManagerProvider;
     private final VariableRegistry variableRegistry;
@@ -87,7 +88,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
 
     private final ConcurrentMap<String, ControllerServiceNode> serviceCache = new ConcurrentHashMap<>();
 
-    public StandardControllerServiceProvider(final FlowController flowController, final ProcessScheduler scheduler, final BulletinRepository bulletinRepo,
+    public StandardControllerServiceProvider(final FlowController flowController, final StandardProcessScheduler scheduler, final BulletinRepository bulletinRepo,
             final StateManagerProvider stateManagerProvider, final VariableRegistry variableRegistry, final NiFiProperties nifiProperties) {
 
         this.flowController = flowController;
@@ -384,6 +385,74 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
         }
     }
 
+    @Override
+    public Future<Void> enableControllerServicesAsync(final Collection<ControllerServiceNode> serviceNodes) {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        processScheduler.submitFrameworkTask(() -> {
+            enableControllerServices(serviceNodes, future);
+            future.complete(null);
+        });
+
+        return future;
+    }
+
+    private void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes, final CompletableFuture<Void> completableFuture) {
+        // validate that we are able to start all of the services.
+        Iterator<ControllerServiceNode> serviceIter = serviceNodes.iterator();
+        while (serviceIter.hasNext()) {
+            ControllerServiceNode controllerServiceNode = serviceIter.next();
+            List<ControllerServiceNode> requiredServices = ((StandardControllerServiceNode) controllerServiceNode).getRequiredControllerServices();
+            for (ControllerServiceNode requiredService : requiredServices) {
+                if (!requiredService.isActive() && !serviceNodes.contains(requiredService)) {
+                    logger.error("Cannot enable {} because it has a dependency on {}, which is not enabled", controllerServiceNode, requiredService);
+                    completableFuture.completeExceptionally(new IllegalStateException("Cannot enable " + controllerServiceNode
+                        + " because it has a dependency on " + requiredService + ", which is not enabled"));
+                    return;
+                }
+            }
+        }
+
+        for (final ControllerServiceNode controllerServiceNode : serviceNodes) {
+            if (completableFuture.isCancelled()) {
+                return;
+            }
+
+            try {
+                if (!controllerServiceNode.isActive()) {
+                    final Future<Void> future = enableControllerServiceDependenciesFirst(controllerServiceNode);
+
+                    while (true) {
+                        try {
+                            future.get(1, TimeUnit.SECONDS);
+                            logger.debug("Successfully enabled {}; service state = {}", controllerServiceNode, controllerServiceNode.getState());
+                            break;
+                        } catch (final TimeoutException e) {
+                            if (completableFuture.isCancelled()) {
+                                return;
+                            }
+                        } catch (final Exception e) {
+                            logger.warn("Failed to enable service {}", controllerServiceNode, e);
+                            completableFuture.completeExceptionally(e);
+
+                            if (this.bulletinRepo != null) {
+                                this.bulletinRepo.addBulletin(BulletinFactory.createBulletin("Controller Service",
+                                    Severity.ERROR.name(), "Could not enable " + controllerServiceNode + " due to " + e));
+                            }
+
+                            return;
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                logger.error("Failed to enable " + controllerServiceNode, e);
+                if (this.bulletinRepo != null) {
+                    this.bulletinRepo.addBulletin(BulletinFactory.createBulletin("Controller Service",
+                        Severity.ERROR.name(), "Could not start " + controllerServiceNode + " due to " + e));
+                }
+            }
+        }
+    }
+
     private Future<Void> enableControllerServiceDependenciesFirst(ControllerServiceNode serviceNode) {
         final Map<ControllerServiceNode, Future<Void>> futures = new HashMap<>();
 
@@ -461,6 +530,58 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
     }
 
     @Override
+    public Future<Void> disableControllerServicesAsync(final Collection<ControllerServiceNode> serviceNodes) {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        processScheduler.submitFrameworkTask(() -> {
+            disableControllerServices(serviceNodes, future);
+            future.complete(null);
+        });
+
+        return future;
+    }
+
+    private void disableControllerServices(final Collection<ControllerServiceNode> serviceNodes, final CompletableFuture<Void> future) {
+        final Set<ControllerServiceNode> serviceNodeSet = new HashSet<>(serviceNodes);
+
+        // Verify that for each Controller Service given, any service that references it is either disabled or is also in the given collection
+        for (final ControllerServiceNode serviceNode : serviceNodes) {
+            final List<ControllerServiceNode> references = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
+            for (final ControllerServiceNode reference : references) {
+                if (reference.isActive()) {
+                    try {
+                        reference.verifyCanDisable(serviceNodeSet);
+                    } catch (final Exception e) {
+                        future.completeExceptionally(e);
+                    }
+                }
+            }
+        }
+
+        for (final ControllerServiceNode serviceNode : serviceNodes) {
+            if (serviceNode.isActive()) {
+                disableReferencingServices(serviceNode);
+                final CompletableFuture<?> serviceFuture = disableControllerService(serviceNode);
+
+                while (true) {
+                    try {
+                        serviceFuture.get(1, TimeUnit.SECONDS);
+                        break;
+                    } catch (final TimeoutException e) {
+                        if (future.isCancelled()) {
+                            return;
+                        }
+
+                        continue;
+                    } catch (final Exception e) {
+                        logger.error("Failed to disable {}", serviceNode, e);
+                        future.completeExceptionally(e);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
     public ControllerService getControllerService(final String serviceIdentifier) {
         final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier);
         return node == null ? null : node.getProxiedControllerService();
index 9418f40..4cd6e2a 100644 (file)
@@ -738,7 +738,8 @@ public final class StandardProcessGroup implements ProcessGroup {
         }
 
         for (final ControllerServiceNode cs : group.getControllerServices(false)) {
-            group.removeControllerService(cs);
+            // Must go through Controller Service here because we need to ensure that it is removed from the cache
+            flowController.removeControllerService(cs);
         }
 
         for (final ProcessGroup childGroup : new ArrayList<>(group.getProcessGroups())) {
@@ -3158,9 +3159,13 @@ public final class StandardProcessGroup implements ProcessGroup {
                     .forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier())));
             });
 
-        processGroup.getProcessGroups().stream()
-            .filter(childGroup -> childGroup.getVersionControlInformation() == null)
-            .forEach(childGroup -> applyVersionedComponentIds(childGroup, lookup));
+        for (final ProcessGroup childGroup : processGroup.getProcessGroups()) {
+            if (childGroup.getVersionControlInformation() == null) {
+                applyVersionedComponentIds(childGroup, lookup);
+            } else if (!childGroup.getVersionedComponentId().isPresent()) {
+                childGroup.setVersionedComponentId(lookup.apply(childGroup.getIdentifier()));
+            }
+        }
     }
 
 
@@ -3242,7 +3247,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", versionedGroup);
             final ComparableDataFlow remoteFlow = new StandardComparableDataFlow("Remote Flow", proposedSnapshot.getFlowContents());
 
-            final FlowComparator flowComparator = new StandardFlowComparator(localFlow, remoteFlow, getAncestorGroupServiceIds(), new StaticDifferenceDescriptor());
+            final FlowComparator flowComparator = new StandardFlowComparator(remoteFlow, localFlow, getAncestorGroupServiceIds(), new StaticDifferenceDescriptor());
             final FlowComparison flowComparison = flowComparator.compare();
 
             final Set<String> updatedVersionedComponentIds = new HashSet<>();
@@ -3387,7 +3392,6 @@ public final class StandardProcessGroup implements ProcessGroup {
             .map(VariableDescriptor::getName)
             .collect(Collectors.toSet());
 
-
         final Map<String, String> updatedVariableMap = new HashMap<>();
 
         // If any new variables exist in the proposed flow, add those to the variable registry.
@@ -3477,6 +3481,7 @@ public final class StandardProcessGroup implements ProcessGroup {
 
             if (childGroup == null) {
                 final ProcessGroup added = addProcessGroup(group, proposedChildGroup, componentIdSeed, variablesToSkip);
+                flowController.onProcessGroupAdded(added);
                 added.findAllRemoteProcessGroups().stream().forEach(RemoteProcessGroup::initialize);
                 LOG.info("Added {} to {}", added, this);
             } else if (childCoordinates == null || updateDescendantVersionedGroups) {
@@ -3496,6 +3501,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             final Funnel funnel = funnelsByVersionedId.get(proposedFunnel.getIdentifier());
             if (funnel == null) {
                 final Funnel added = addFunnel(group, proposedFunnel, componentIdSeed);
+                flowController.onFunnelAdded(added);
                 LOG.info("Added {} to {}", added, this);
             } else if (updatedVersionedComponentIds.contains(proposedFunnel.getIdentifier())) {
                 updateFunnel(funnel, proposedFunnel);
@@ -3517,6 +3523,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             final Port port = inputPortsByVersionedId.get(proposedPort.getIdentifier());
             if (port == null) {
                 final Port added = addInputPort(group, proposedPort, componentIdSeed);
+                flowController.onInputPortAdded(added);
                 LOG.info("Added {} to {}", added, this);
             } else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) {
                 updatePort(port, proposedPort);
@@ -3537,6 +3544,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             final Port port = outputPortsByVersionedId.get(proposedPort.getIdentifier());
             if (port == null) {
                 final Port added = addOutputPort(group, proposedPort, componentIdSeed);
+                flowController.onOutputPortAdded(added);
                 LOG.info("Added {} to {}", added, this);
             } else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) {
                 updatePort(port, proposedPort);
@@ -3580,6 +3588,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier());
             if (processor == null) {
                 final ProcessorNode added = addProcessor(group, proposedProcessor, componentIdSeed);
+                flowController.onProcessorAdded(added);
 
                 final Set<Relationship> proposedAutoTerminated =
                     proposedProcessor.getAutoTerminatedRelationships() == null ? Collections.emptySet() : proposedProcessor.getAutoTerminatedRelationships().stream()
@@ -3638,6 +3647,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             final Connection connection = connectionsByVersionedId.get(proposedConnection.getIdentifier());
             if (connection == null) {
                 final Connection added = addConnection(group, proposedConnection, componentIdSeed);
+                flowController.onConnectionAdded(added);
                 LOG.info("Added {} to {}", added, this);
             } else if (isUpdateable(connection)) {
                 // If the connection needs to be updated, then the source and destination will already have
@@ -3658,6 +3668,7 @@ public final class StandardProcessGroup implements ProcessGroup {
             final Connection connection = connectionsByVersionedId.get(removedVersionedId);
             LOG.info("Removing {} from {}", connection, group);
             group.removeConnection(connection);
+            flowController.onConnectionRemoved(connection);
         }
 
         // Once the appropriate connections have been removed, we may now update Processors' auto-terminated relationships.
@@ -3670,7 +3681,8 @@ public final class StandardProcessGroup implements ProcessGroup {
         for (final String removedVersionedId : controllerServicesRemoved) {
             final ControllerServiceNode service = servicesByVersionedId.get(removedVersionedId);
             LOG.info("Removing {} from {}", service, group);
-            group.removeControllerService(service);
+            // Must remove Controller Service through Flow Controller in order to remove from cache
+            flowController.removeControllerService(service);
         }
 
         for (final String removedVersionedId : funnelsRemoved) {
@@ -4065,13 +4077,6 @@ public final class StandardProcessGroup implements ProcessGroup {
                     // to the instance ID of the Controller Service.
                     final String serviceVersionedComponentId = entry.getValue();
                     String instanceId = getServiceInstanceId(serviceVersionedComponentId, group);
-                    if (instanceId == null) {
-                        // We didn't find the instance ID based on the Versioned Component ID. So we want to just
-                        // leave the value set to whatever it currently is, if it's currently set.
-                        final PropertyDescriptor propertyDescriptor = new PropertyDescriptor.Builder().name(entry.getKey()).build();
-                        instanceId = currentProperties.get(propertyDescriptor);
-                    }
-
                     value = instanceId == null ? serviceVersionedComponentId : instanceId;
                 } else {
                     value = entry.getValue();
@@ -4085,13 +4090,9 @@ public final class StandardProcessGroup implements ProcessGroup {
     }
 
     private String getServiceInstanceId(final String serviceVersionedComponentId, final ProcessGroup group) {
-        for (final ControllerServiceNode serviceNode : group.getControllerServices(false)) {
+        for (final ControllerServiceNode serviceNode : group.getControllerServices(true)) {
             final Optional<String> optionalVersionedId = serviceNode.getVersionedComponentId();
-            if (!optionalVersionedId.isPresent()) {
-                continue;
-            }
-
-            final String versionedId = optionalVersionedId.get();
+            final String versionedId = optionalVersionedId.orElseGet(() -> UUID.nameUUIDFromBytes(serviceNode.getIdentifier().getBytes(StandardCharsets.UTF_8)).toString());
             if (versionedId.equals(serviceVersionedComponentId)) {
                 return serviceNode.getIdentifier();
             }
@@ -4319,7 +4320,6 @@ public final class StandardProcessGroup implements ProcessGroup {
                 }
             }
 
-
             // Ensure that all Processors are instantiate-able.
             final Map<String, VersionedProcessor> proposedProcessors = new HashMap<>();
             findAllProcessors(updatedFlow.getFlowContents(), proposedProcessors);
index 314738a..c0b36c9 100644 (file)
@@ -47,7 +47,6 @@ import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.LoggableComponent;
-import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReloadComponent;
 import org.apache.nifi.controller.ReportingTaskNode;
@@ -269,7 +268,7 @@ public class TestStandardProcessScheduler {
      */
     @Test
     public void validateServiceEnablementLogicHappensOnlyOnce() throws Exception {
-        final ProcessScheduler scheduler = createScheduler();
+        final StandardProcessScheduler scheduler = createScheduler();
         final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
         final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
                 "1", systemBundle.getBundleDetails().getCoordinate(), null, false);
@@ -308,7 +307,7 @@ public class TestStandardProcessScheduler {
      */
     @Test
     public void validateDisabledServiceCantBeDisabled() throws Exception {
-        final ProcessScheduler scheduler = createScheduler();
+        final StandardProcessScheduler scheduler = createScheduler();
         final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
         final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
                 "1", systemBundle.getBundleDetails().getCoordinate(), null, false);
@@ -346,7 +345,7 @@ public class TestStandardProcessScheduler {
      */
     @Test
     public void validateEnabledServiceCanOnlyBeDisabledOnce() throws Exception {
-        final ProcessScheduler scheduler = createScheduler();
+        final StandardProcessScheduler scheduler = createScheduler();
         final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
         final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(),
                 "1", systemBundle.getBundleDetails().getCoordinate(), null, false);
@@ -380,7 +379,7 @@ public class TestStandardProcessScheduler {
 
     @Test
     public void validateDisablingOfTheFailedService() throws Exception {
-        final ProcessScheduler scheduler = createScheduler();
+        final StandardProcessScheduler scheduler = createScheduler();
         final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
         final ControllerServiceNode serviceNode = provider.createControllerService(FailingService.class.getName(),
                 "1", systemBundle.getBundleDetails().getCoordinate(), null, false);
@@ -412,7 +411,7 @@ public class TestStandardProcessScheduler {
     @Test
     @Ignore
     public void validateEnabledDisableMultiThread() throws Exception {
-        final ProcessScheduler scheduler = createScheduler();
+        final StandardProcessScheduler scheduler = createScheduler();
         final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
         final ExecutorService executor = Executors.newCachedThreadPool();
         for (int i = 0; i < 200; i++) {
@@ -455,7 +454,7 @@ public class TestStandardProcessScheduler {
      */
     @Test
     public void validateNeverEnablingServiceCanStillBeDisabled() throws Exception {
-        final ProcessScheduler scheduler = createScheduler();
+        final StandardProcessScheduler scheduler = createScheduler();
         final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
         final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(),
                 "1", systemBundle.getBundleDetails().getCoordinate(), null, false);
@@ -481,7 +480,7 @@ public class TestStandardProcessScheduler {
      */
     @Test
     public void validateLongEnablingServiceCanStillBeDisabled() throws Exception {
-        final ProcessScheduler scheduler = createScheduler();
+        final StandardProcessScheduler scheduler = createScheduler();
         final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties);
         final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(),
                 "1", systemBundle.getBundleDetails().getCoordinate(), null, false);
@@ -581,7 +580,7 @@ public class TestStandardProcessScheduler {
         }
     }
 
-    private ProcessScheduler createScheduler() {
+    private StandardProcessScheduler createScheduler() {
         return new StandardProcessScheduler(null, null, stateMgrProvider, nifiProperties);
     }
 }
index 0d15143..ed335e9 100644 (file)
@@ -38,7 +38,6 @@ import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateManagerProvider;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.LoggableComponent;
-import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReloadComponent;
 import org.apache.nifi.controller.ScheduledState;
@@ -146,7 +145,7 @@ public class TestStandardControllerServiceProvider {
         final FlowController controller = Mockito.mock(FlowController.class);
         Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
 
-        final ProcessScheduler scheduler = createScheduler();
+        final StandardProcessScheduler scheduler = createScheduler();
         final StandardControllerServiceProvider provider =
                 new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider, variableRegistry, niFiProperties);
 
@@ -162,7 +161,7 @@ public class TestStandardControllerServiceProvider {
         final FlowController controller = Mockito.mock(FlowController.class);
         Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(group);
 
-        final ProcessScheduler scheduler = createScheduler();
+        final StandardProcessScheduler scheduler = createScheduler();
         final StandardControllerServiceProvider provider =
                 new StandardControllerServiceProvider(controller, scheduler, null, stateManagerProvider, variableRegistry, niFiProperties);
 
@@ -214,13 +213,13 @@ public class TestStandardControllerServiceProvider {
      */
     @Test(timeout = 120000)
     public void testConcurrencyWithEnablingReferencingServicesGraph() throws InterruptedException {
-        final ProcessScheduler scheduler = createScheduler();
+        final StandardProcessScheduler scheduler = createScheduler();
         for (int i = 0; i < 5000; i++) {
             testEnableReferencingServicesGraph(scheduler);
         }
     }
 
-    public void testEnableReferencingServicesGraph(final ProcessScheduler scheduler) {
+    public void testEnableReferencingServicesGraph(final StandardProcessScheduler scheduler) {
         final ProcessGroup procGroup = new MockProcessGroup(controller);
         final FlowController controller = Mockito.mock(FlowController.class);
         Mockito.when(controller.getGroup(Mockito.anyString())).thenReturn(procGroup);
index ab96747..165af45 100644 (file)
@@ -116,6 +116,7 @@ import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
 import org.apache.nifi.web.api.entity.VersionedFlowEntity;
 import org.apache.nifi.web.api.entity.VersionedFlowSnapshotMetadataEntity;
 
+import java.util.Collection;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
@@ -778,6 +779,15 @@ public interface NiFiServiceFacade {
     PortEntity getInputPort(String inputPortId);
 
     /**
+     * Gets an input port as it is available to the given user
+     *
+     * @param inputPortId The input port id
+     * @param user the user
+     * @return port
+     */
+    PortEntity getInputPort(String inputPortId, NiFiUser user);
+
+    /**
      * Gets all input ports in a given group.
      *
      * @param groupId The id of the group
@@ -847,6 +857,15 @@ public interface NiFiServiceFacade {
     PortEntity getOutputPort(String outputPortId);
 
     /**
+     * Gets an output port as it is available to the given user
+     *
+     * @param outputPortId The output port id
+     * @param user the user
+     * @return port
+     */
+    PortEntity getOutputPort(String outputPortId, NiFiUser user);
+
+    /**
      * Gets all output ports in a given group.
      *
      * @param groupId The id of the group
@@ -1008,7 +1027,7 @@ public interface NiFiServiceFacade {
      * @param state the state
      * @param serviceIds the id's of the services
      */
-    void verifyActivateControllerServices(String processGroupId, ControllerServiceState state, Set<String> serviceIds);
+    void verifyActivateControllerServices(String processGroupId, ControllerServiceState state, Collection<String> serviceIds);
 
     /**
      * Enables or disables the controller services with the given IDs & Revisions on behalf of the currently logged in user
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/ResumeFlowException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/ResumeFlowException.java
new file mode 100644 (file)
index 0000000..8e1e123
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web;
+
+/**
+ * Thrown whenever a flow cannot be resumed due to validation error, etc.
+ */
+public class ResumeFlowException extends Exception {
+    public ResumeFlowException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+
+    public ResumeFlowException(final String message) {
+        super(message);
+    }
+}
index 1ccced2..6e0e9d2 100644 (file)
@@ -16,7 +16,9 @@
  */
 package org.apache.nifi.web;
 
-import com.google.common.collect.Sets;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.nifi.action.Action;
 import org.apache.nifi.action.Component;
@@ -271,8 +273,8 @@ import org.apache.nifi.web.util.SnippetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
+import com.google.common.collect.Sets;
+
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -484,8 +486,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
-    public void verifyActivateControllerServices(final String groupId, final ControllerServiceState state, final Set<String> serviceIds) {
-        processGroupDAO.verifyActivateControllerServices(groupId, state, serviceIds);
+    public void verifyActivateControllerServices(final String groupId, final ControllerServiceState state, final Collection<String> serviceIds) {
+        processGroupDAO.verifyActivateControllerServices(state, serviceIds);
     }
 
     @Override
@@ -1016,7 +1018,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
                 @Override
                 public RevisionUpdate<ActivateControllerServicesEntity> update() {
                     // schedule the components
-                    processGroupDAO.activateControllerServices(processGroupId, state, serviceRevisions.keySet());
+                    processGroupDAO.activateControllerServices(state, serviceRevisions.keySet());
 
                     // update the revisions
                     final Map<String, Revision> updatedRevisions = new HashMap<>();
@@ -3289,8 +3291,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     private PortEntity createInputPortEntity(final Port port) {
+        return createInputPortEntity(port, NiFiUserUtils.getNiFiUser());
+    }
+
+    private PortEntity createInputPortEntity(final Port port, final NiFiUser user) {
         final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(port.getIdentifier()));
-        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port);
+        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port, user);
         final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getInputPortStatus(port.getIdentifier()));
         final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier()));
         final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
@@ -3298,8 +3304,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     private PortEntity createOutputPortEntity(final Port port) {
+        return createOutputPortEntity(port, NiFiUserUtils.getNiFiUser());
+    }
+
+    private PortEntity createOutputPortEntity(final Port port, final NiFiUser user) {
         final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(port.getIdentifier()));
-        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port);
+        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(port, user);
         final PortStatusDTO status = dtoFactory.createPortStatusDto(controllerFacade.getOutputPortStatus(port.getIdentifier()));
         final List<BulletinDTO> bulletins = dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForSource(port.getIdentifier()));
         final List<BulletinEntity> bulletinEntities = bulletins.stream().map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissions.getCanRead())).collect(Collectors.toList());
@@ -3409,6 +3419,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
+    public PortEntity getInputPort(final String inputPortId, final NiFiUser user) {
+        final Port port = inputPortDAO.getPort(inputPortId);
+        return createInputPortEntity(port, user);
+    }
+
+    @Override
     public PortStatusEntity getInputPortStatus(final String inputPortId) {
         final Port inputPort = inputPortDAO.getPort(inputPortId);
         final PermissionsDTO permissions = dtoFactory.createPermissionsDto(inputPort);
@@ -3423,6 +3439,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
+    public PortEntity getOutputPort(final String outputPortId, final NiFiUser user) {
+        final Port port = outputPortDAO.getPort(outputPortId);
+        return createOutputPortEntity(port, user);
+    }
+
+    @Override
     public PortStatusEntity getOutputPortStatus(final String outputPortId) {
         final Port outputPort = outputPortDAO.getPort(outputPortId);
         final PermissionsDTO permissions = dtoFactory.createPermissionsDto(outputPort);
@@ -3974,28 +3996,95 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
             })
             .collect(Collectors.toCollection(HashSet::new));
 
-        final Map<String, List<Connection>> connectionsByVersionedId = group.findAllConnections().stream()
-            .filter(conn -> conn.getVersionedComponentId().isPresent())
-            .collect(Collectors.groupingBy(conn -> conn.getVersionedComponentId().get()));
+        for (final FlowDifference difference : comparison.getDifferences()) {
+            final VersionedComponent localComponent = difference.getComponentA();
+            if (localComponent == null) {
+                continue;
+            }
 
+            // If any Process Group is removed, consider all components below that Process Group as an affected component
+            if (difference.getDifferenceType() == DifferenceType.COMPONENT_REMOVED && localComponent.getComponentType() == org.apache.nifi.registry.flow.ComponentType.PROCESS_GROUP) {
+                final String localGroupId = ((InstantiatedVersionedProcessGroup) localComponent).getInstanceId();
+                final ProcessGroup localGroup = processGroupDAO.getProcessGroup(localGroupId);
+
+                localGroup.findAllProcessors().stream()
+                    .map(comp -> createAffectedComponentEntity(comp, user))
+                    .forEach(affectedComponents::add);
+                localGroup.findAllFunnels().stream()
+                    .map(comp -> createAffectedComponentEntity(comp, user))
+                    .forEach(affectedComponents::add);
+                localGroup.findAllInputPorts().stream()
+                    .map(comp -> createAffectedComponentEntity(comp, user))
+                    .forEach(affectedComponents::add);
+                localGroup.findAllOutputPorts().stream()
+                    .map(comp -> createAffectedComponentEntity(comp, user))
+                    .forEach(affectedComponents::add);
+                localGroup.findAllRemoteProcessGroups().stream()
+                    .flatMap(rpg -> Stream.concat(rpg.getInputPorts().stream(), rpg.getOutputPorts().stream()))
+                    .map(comp -> createAffectedComponentEntity(comp, user))
+                    .forEach(affectedComponents::add);
+                localGroup.findAllControllerServices().stream()
+                    .map(comp -> createAffectedComponentEntity(comp, user))
+                    .forEach(affectedComponents::add);
+            }
+
+            if (localComponent.getComponentType() == org.apache.nifi.registry.flow.ComponentType.CONTROLLER_SERVICE) {
+                final String serviceId = ((InstantiatedVersionedControllerService) localComponent).getInstanceId();
+                final ControllerServiceNode serviceNode = controllerServiceDAO.getControllerService(serviceId);
+
+                final List<ControllerServiceNode> referencingServices = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
+                for (final ControllerServiceNode referencingService : referencingServices) {
+                    affectedComponents.add(createAffectedComponentEntity(referencingService, user));
+                }
+
+                final List<ProcessorNode> referencingProcessors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class);
+                for (final ProcessorNode referencingProcessor : referencingProcessors) {
+                    affectedComponents.add(createAffectedComponentEntity(referencingProcessor, user));
+                }
+            }
+        }
+
+        // Create a map of all connectable components by versioned component ID to the connectable component itself
+        final Map<String, List<Connectable>> connectablesByVersionId = new HashMap<>();
+        mapToConnectableId(group.findAllFunnels(), connectablesByVersionId);
+        mapToConnectableId(group.findAllInputPorts(), connectablesByVersionId);
+        mapToConnectableId(group.findAllOutputPorts(), connectablesByVersionId);
+        mapToConnectableId(group.findAllProcessors(), connectablesByVersionId);
+
+        final List<RemoteGroupPort> remotePorts = new ArrayList<>();
+        for (final RemoteProcessGroup rpg : group.findAllRemoteProcessGroups()) {
+            remotePorts.addAll(rpg.getInputPorts());
+            remotePorts.addAll(rpg.getOutputPorts());
+        }
+        mapToConnectableId(remotePorts, connectablesByVersionId);
+
+        // If any connection is added or modified, we need to stop both the source (if it exists in the flow currently)
+        // and the destination (if it exists in the flow currently).
         for (final FlowDifference difference : comparison.getDifferences()) {
             VersionedComponent component = difference.getComponentA();
             if (component == null) {
                 component = difference.getComponentB();
             }
 
-            if (component.getComponentType() == org.apache.nifi.registry.flow.ComponentType.CONNECTION) {
-                final VersionedConnection connection = (VersionedConnection) component;
+            if (component.getComponentType() != org.apache.nifi.registry.flow.ComponentType.CONNECTION) {
+                continue;
+            }
 
-                final String versionedConnectionId = connection.getIdentifier();
-                final List<Connection> instances = connectionsByVersionedId.get(versionedConnectionId);
-                if (instances == null) {
-                    continue;
+            final VersionedConnection connection = (VersionedConnection) component;
+
+            final String sourceVersionedId = connection.getSource().getId();
+            final List<Connectable> sources = connectablesByVersionId.get(sourceVersionedId);
+            if (sources != null) {
+                for (final Connectable source : sources) {
+                    affectedComponents.add(createAffectedComponentEntity(source, user));
                 }
+            }
 
-                for (final Connection instance : instances) {
-                    affectedComponents.add(createAffectedComponentEntity(instance.getSource(), user));
-                    affectedComponents.add(createAffectedComponentEntity(instance.getDestination(), user));
+            final String destinationVersionId = connection.getDestination().getId();
+            final List<Connectable> destinations = connectablesByVersionId.get(destinationVersionId);
+            if (destinations != null) {
+                for (final Connectable destination : destinations) {
+                    affectedComponents.add(createAffectedComponentEntity(destination, user));
                 }
             }
         }
@@ -4003,6 +4092,18 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         return affectedComponents;
     }
 
+    private void mapToConnectableId(final Collection<? extends Connectable> connectables, final Map<String, List<Connectable>> destination) {
+        for (final Connectable connectable : connectables) {
+            final Optional<String> versionedId = connectable.getVersionedComponentId();
+            if (!versionedId.isPresent()) {
+                continue;
+            }
+
+            final List<Connectable> byVersionedId = destination.computeIfAbsent(versionedId.get(), key -> new ArrayList<>());
+            byVersionedId.add(connectable);
+        }
+    }
+
 
     private AffectedComponentEntity createAffectedComponentEntity(final Connectable connectable, final NiFiUser user) {
         final AffectedComponentEntity entity = new AffectedComponentEntity();
@@ -4023,6 +4124,25 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         return entity;
     }
 
+    private AffectedComponentEntity createAffectedComponentEntity(final ControllerServiceNode serviceNode, final NiFiUser user) {
+        final AffectedComponentEntity entity = new AffectedComponentEntity();
+        entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(serviceNode.getIdentifier())));
+        entity.setId(serviceNode.getIdentifier());
+
+        final Authorizable authorizable = authorizableLookup.getControllerService(serviceNode.getIdentifier()).getAuthorizable();
+        final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable, user);
+        entity.setPermissions(permissionsDto);
+
+        final AffectedComponentDTO dto = new AffectedComponentDTO();
+        dto.setId(serviceNode.getIdentifier());
+        dto.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
+        dto.setProcessGroupId(serviceNode.getProcessGroupIdentifier());
+        dto.setState(serviceNode.getState().name());
+
+        entity.setComponent(dto);
+        return entity;
+    }
+
     private AffectedComponentEntity createAffectedComponentEntity(final InstantiatedVersionedComponent instance, final String componentTypeName, final String componentState, final NiFiUser user) {
         final AffectedComponentEntity entity = new AffectedComponentEntity();
         entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(instance.getInstanceId())));
index b106948..d8c8ddf 100644 (file)
@@ -45,6 +45,7 @@ import org.apache.nifi.registry.flow.VersionedProcessGroup;
 import org.apache.nifi.util.BundleUtils;
 import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.ResourceNotFoundException;
+import org.apache.nifi.web.ResumeFlowException;
 import org.apache.nifi.web.Revision;
 import org.apache.nifi.web.api.concurrent.AsyncRequestManager;
 import org.apache.nifi.web.api.concurrent.AsynchronousWebRequest;
@@ -1128,6 +1129,11 @@ public class VersionsResource extends ApplicationResource {
                             idGenerationSeed, true, true);
 
                         vcur.markComplete(updatedVersionControlEntity);
+                    } catch (final ResumeFlowException rfe) {
+                        // Treat ResumeFlowException differently because we don't want to include a message that we couldn't update the flow
+                        // since in this case the flow was successfully updated - we just couldn't re-enable the components.
+                        logger.error(rfe.getMessage(), rfe);
+                        vcur.setFailureReason(rfe.getMessage());
                     } catch (final Exception e) {
                         logger.error("Failed to update flow to new version", e);
                         vcur.setFailureReason("Failed to update flow to new version due to " + e);
@@ -1301,6 +1307,11 @@ public class VersionsResource extends ApplicationResource {
                             idGenerationSeed, false, true);
 
                         vcur.markComplete(updatedVersionControlEntity);
+                    } catch (final ResumeFlowException rfe) {
+                        // Treat ResumeFlowException differently because we don't want to include a message that we couldn't update the flow
+                        // since in this case the flow was successfully updated - we just couldn't re-enable the components.
+                        logger.error(rfe.getMessage(), rfe);
+                        vcur.setFailureReason(rfe.getMessage());
                     } catch (final Exception e) {
                         logger.error("Failed to update flow to new version", e);
                         vcur.setFailureReason("Failed to update flow to new version due to " + e.getMessage());
@@ -1333,13 +1344,15 @@ public class VersionsResource extends ApplicationResource {
     private VersionControlInformationEntity updateFlowVersion(final String groupId, final ComponentLifecycle componentLifecycle, final URI exampleUri,
         final Set<AffectedComponentEntity> affectedComponents, final NiFiUser user, final boolean replicateRequest, final Revision revision, final VersionControlInformationEntity requestEntity,
         final VersionedFlowSnapshot flowSnapshot, final AsynchronousWebRequest<VersionControlInformationEntity> asyncRequest, final String idGenerationSeed,
-        final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) throws LifecycleManagementException {
+        final boolean verifyNotModified, final boolean updateDescendantVersionedFlows) throws LifecycleManagementException, ResumeFlowException {
 
         // Steps 6-7: Determine which components must be stopped and stop them.
         final Set<String> stoppableReferenceTypes = new HashSet<>();
         stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
         stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT);
         stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_REMOTE_OUTPUT_PORT);
+        stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_INPUT_PORT);
+        stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_OUTPUT_PORT);
 
         final Set<AffectedComponentEntity> runningComponents = affectedComponents.stream()
             .filter(dto -> stoppableReferenceTypes.contains(dto.getComponent().getReferenceType()))
@@ -1459,7 +1472,14 @@ public class VersionsResource extends ApplicationResource {
                 asyncRequest.setCancelCallback(enableServicesPause::cancel);
                 final Set<AffectedComponentEntity> servicesToEnable = getUpdatedEntities(enabledServices, user);
                 logger.info("Successfully updated flow; re-enabling {} Controller Services", servicesToEnable.size());
-                componentLifecycle.activateControllerServices(exampleUri, user, groupId, servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause);
+
+                try {
+                    componentLifecycle.activateControllerServices(exampleUri, user, groupId, servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause);
+                } catch (final IllegalStateException ise) {
+                    // Component Lifecycle will re-enable the Controller Services only if they are valid. If IllegalStateException gets thrown, we need to provide
+                    // a more intelligent error message as to exactly what happened, rather than indicate that the flow could not be updated.
+                    throw new ResumeFlowException("Failed to re-enable Controller Services because " + ise.getMessage(), ise);
+                }
             }
 
             if (!asyncRequest.isCancelled()) {
@@ -1474,7 +1494,14 @@ public class VersionsResource extends ApplicationResource {
                 final CancellableTimedPause startComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
                 asyncRequest.setCancelCallback(startComponentsPause::cancel);
                 logger.info("Restarting {} Processors", componentsToStart.size());
-                componentLifecycle.scheduleComponents(exampleUri, user, groupId, componentsToStart, ScheduledState.RUNNING, startComponentsPause);
+
+                try {
+                    componentLifecycle.scheduleComponents(exampleUri, user, groupId, componentsToStart, ScheduledState.RUNNING, startComponentsPause);
+                } catch (final IllegalStateException ise) {
+                    // Component Lifecycle will restart the Processors only if they are valid. If IllegalStateException gets thrown, we need to provide
+                    // a more intelligent error message as to exactly what happened, rather than indicate that the flow could not be updated.
+                    throw new ResumeFlowException("Failed to restart components because " + ise.getMessage(), ise);
+                }
             }
         }
 
index 4198303..3d9f521 100644 (file)
@@ -179,6 +179,7 @@ import org.apache.nifi.web.api.entity.ComponentReferenceEntity;
 import org.apache.nifi.web.api.entity.ConnectionStatusSnapshotEntity;
 import org.apache.nifi.web.api.entity.ControllerServiceEntity;
 import org.apache.nifi.web.api.entity.FlowBreadcrumbEntity;
+import org.apache.nifi.web.api.entity.PortEntity;
 import org.apache.nifi.web.api.entity.PortStatusSnapshotEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
@@ -1799,6 +1800,32 @@ public final class DtoFactory {
         return component;
     }
 
+    public AffectedComponentEntity createAffectedComponentEntity(final PortEntity portEntity, final String referenceType) {
+        if (portEntity == null) {
+            return null;
+        }
+
+        final AffectedComponentEntity component = new AffectedComponentEntity();
+        component.setBulletins(portEntity.getBulletins());
+        component.setId(portEntity.getId());
+        component.setPermissions(portEntity.getPermissions());
+        component.setPosition(portEntity.getPosition());
+        component.setRevision(portEntity.getRevision());
+        component.setUri(portEntity.getUri());
+
+        final PortDTO portDto = portEntity.getComponent();
+        final AffectedComponentDTO componentDto = new AffectedComponentDTO();
+        componentDto.setId(portDto.getId());
+        componentDto.setName(portDto.getName());
+        componentDto.setProcessGroupId(portDto.getParentGroupId());
+        componentDto.setReferenceType(referenceType);
+        componentDto.setState(portDto.getState());
+        componentDto.setValidationErrors(portDto.getValidationErrors());
+        component.setComponent(componentDto);
+
+        return component;
+    }
+
     public AffectedComponentEntity createAffectedComponentEntity(final ControllerServiceEntity serviceEntity) {
         if (serviceEntity == null) {
             return null;
index 459acfc..7582420 100644 (file)
@@ -25,6 +25,7 @@ import org.apache.nifi.web.api.dto.ProcessGroupDTO;
 import org.apache.nifi.web.api.dto.VariableRegistryDTO;
 import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Future;
@@ -74,11 +75,10 @@ public interface ProcessGroupDAO {
     /**
      * Verifies the specified controller services can be modified
      *
-     * @param groupId the ID of the process group
      * @param state the desired state
      * @param serviceIds the ID's of the controller services
      */
-    void verifyActivateControllerServices(String groupId, ControllerServiceState state, Set<String> serviceIds);
+    void verifyActivateControllerServices(ControllerServiceState state, Collection<String> serviceIds);
 
     /**
      * Schedules the components in the specified process group.
@@ -93,11 +93,10 @@ public interface ProcessGroupDAO {
     /**
      * Enables or disables the controller services in the specified process group
      *
-     * @param groupId the id of the group
      * @param state the desired state
      * @param serviceIds the ID's of the services to enable or disable
      */
-    Future<Void> activateControllerServices(String groupId, ControllerServiceState state, Set<String> serviceIds);
+    Future<Void> activateControllerServices(ControllerServiceState state, Collection<String> serviceIds);
 
     /**
      * Updates the specified process group.
index 5bbb56f..e1d9e69 100644 (file)
@@ -41,8 +41,10 @@ import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
 import org.apache.nifi.web.api.entity.VariableEntity;
 import org.apache.nifi.web.dao.ProcessGroupDAO;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -130,18 +132,18 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
     }
 
     @Override
-    public void verifyActivateControllerServices(final String groupId, final ControllerServiceState state, final Set<String> serviceIds) {
-        final ProcessGroup group = locateProcessGroup(flowController, groupId);
-
-        group.findAllControllerServices().stream()
-            .filter(service -> serviceIds.contains(service.getIdentifier()))
-            .forEach(service -> {
-                if (state == ControllerServiceState.ENABLED) {
-                    service.verifyCanEnable();
-                } else {
-                    service.verifyCanDisable();
-                }
-            });
+    public void verifyActivateControllerServices(final ControllerServiceState state, final Collection<String> serviceIds) {
+        final Set<ControllerServiceNode> serviceNodes = serviceIds.stream()
+            .map(flowController::getControllerServiceNode)
+            .collect(Collectors.toSet());
+
+        for (final ControllerServiceNode serviceNode : serviceNodes) {
+            if (state == ControllerServiceState.ENABLED) {
+                serviceNode.verifyCanEnable(serviceNodes);
+            } else {
+                serviceNode.verifyCanDisable(serviceNodes);
+            }
+        }
     }
 
     @Override
@@ -201,26 +203,16 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
     }
 
     @Override
-    public Future<Void> activateControllerServices(final String groupId, final ControllerServiceState state, final Set<String> serviceIds) {
-        final ProcessGroup group = locateProcessGroup(flowController, groupId);
-
-        CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
-        for (final String serviceId : serviceIds) {
-            final ControllerServiceNode serviceNode = group.findControllerService(serviceId, true, true);
-            if (serviceNode == null) {
-                throw new ResourceNotFoundException("Could not find Controller Service with identifier " + serviceId);
-            }
-
-            if (ControllerServiceState.ENABLED.equals(state)) {
-                final CompletableFuture<Void> serviceFuture = flowController.enableControllerService(serviceNode);
-                future = CompletableFuture.allOf(future, serviceFuture);
-            } else {
-                final CompletableFuture<Void> serviceFuture = flowController.disableControllerService(serviceNode);
-                future = CompletableFuture.allOf(future, serviceFuture);
-            }
+    public Future<Void> activateControllerServices(final ControllerServiceState state, final Collection<String> serviceIds) {
+        final List<ControllerServiceNode> serviceNodes = serviceIds.stream()
+            .map(flowController::getControllerServiceNode)
+            .collect(Collectors.toList());
+
+        if (state == ControllerServiceState.ENABLED) {
+            return flowController.enableControllerServicesAsync(serviceNodes);
+        } else {
+            return flowController.disableControllerServicesAsync(serviceNodes);
         }
-
-        return future;
     }
 
     @Override
index a801dcb..f257bb1 100644 (file)
@@ -27,6 +27,7 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
 import org.apache.nifi.web.api.entity.AffectedComponentEntity;
 import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import org.apache.nifi.web.api.entity.PortEntity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
 import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
 
@@ -39,6 +40,14 @@ public class AffectedComponentUtils {
             case AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR:
                 final ProcessorEntity procEntity = serviceFacade.getProcessor(componentEntity.getId(), user);
                 return dtoFactory.createAffectedComponentEntity(procEntity);
+            case AffectedComponentDTO.COMPONENT_TYPE_INPUT_PORT: {
+                final PortEntity portEntity = serviceFacade.getInputPort(componentEntity.getId(), user);
+                return dtoFactory.createAffectedComponentEntity(portEntity, AffectedComponentDTO.COMPONENT_TYPE_INPUT_PORT);
+            }
+            case AffectedComponentDTO.COMPONENT_TYPE_OUTPUT_PORT: {
+                final PortEntity portEntity = serviceFacade.getOutputPort(componentEntity.getId(), user);
+                return dtoFactory.createAffectedComponentEntity(portEntity, AffectedComponentDTO.COMPONENT_TYPE_OUTPUT_PORT);
+            }
             case AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE:
                 final ControllerServiceEntity serviceEntity = serviceFacade.getControllerService(componentEntity.getId(), user);
                 return dtoFactory.createAffectedComponentEntity(serviceEntity);
index e005d28..1c7e82d 100644 (file)
@@ -18,7 +18,9 @@
 package org.apache.nifi.web.util;
 
 import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceState;
 import org.apache.nifi.web.NiFiServiceFacade;
 import org.apache.nifi.web.Revision;
@@ -33,6 +35,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
@@ -228,6 +233,46 @@ public class LocalComponentLifecycle implements ComponentLifecycle {
         waitForControllerServiceState(processGroupId, affectedServices, ControllerServiceState.DISABLED, pause, user);
     }
 
+    static List<List<ControllerServiceNode>> determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap) {
+        final List<List<ControllerServiceNode>> orderedNodeLists = new ArrayList<>();
+
+        for (final ControllerServiceNode node : serviceNodeMap.values()) {
+            final List<ControllerServiceNode> branch = new ArrayList<>();
+            determineEnablingOrder(serviceNodeMap, node, branch, new HashSet<ControllerServiceNode>());
+            orderedNodeLists.add(branch);
+        }
+
+        return orderedNodeLists;
+    }
+
+    private static void determineEnablingOrder(
+        final Map<String, ControllerServiceNode> serviceNodeMap,
+        final ControllerServiceNode contextNode,
+        final List<ControllerServiceNode> orderedNodes,
+        final Set<ControllerServiceNode> visited) {
+        if (visited.contains(contextNode)) {
+            return;
+        }
+
+        for (final Map.Entry<PropertyDescriptor, String> entry : contextNode.getProperties().entrySet()) {
+            if (entry.getKey().getControllerServiceDefinition() != null) {
+                final String referencedServiceId = entry.getValue();
+                if (referencedServiceId != null) {
+                    final ControllerServiceNode referencedNode = serviceNodeMap.get(referencedServiceId);
+                    if (!orderedNodes.contains(referencedNode)) {
+                        visited.add(contextNode);
+                        determineEnablingOrder(serviceNodeMap, referencedNode, orderedNodes, visited);
+                    }
+                }
+            }
+        }
+
+        if (!orderedNodes.contains(contextNode)) {
+            orderedNodes.add(contextNode);
+        }
+    }
+
+
     /**
      * Periodically polls the process group with the given ID, waiting for all controller services whose ID's are given to have the given Controller Service State.
      *