NIFI-4436:
authorMark Payne <markap14@hotmail.com>
Thu, 10 Aug 2017 14:02:35 +0000 (10:02 -0400)
committerBryan Bende <bbende@apache.org>
Mon, 8 Jan 2018 17:44:52 +0000 (12:44 -0500)
- Initial checkpoint: able ot start version control and detect changes, in standalone mode, still 'crude' implementation
- Checkpoint: Can place flow under version control and can determine if modified
- Checkpoint: Change version working in some cases. Does not work if processor removed because COMPONENT_REMOVED type has ComponentA whose ID is the VersionedComponentID but we are trying to call ProcessorDAO.get() with this ID
- Checkpoint: Able to change flow from Version 1 to Version 2 and back. Not yet tested with controller services. Have not tried changing/removing connections. Not cluster-friendly yet. All inline, not in background. Have not taken into account ports, funnels, remote ports, etc. Have not tested with Labels yet
- Checkpoint after implementing ClusterReplicationComponentLifecycle instead of JerseyClientComponentLifecycle
- Checkpoint: Updated to allow starting version control and updating version in clustered mode
- Checkpoint: Updated versioning endpoint so that when version of a flow is updated, the bundle information is populated and the snapshot is replicated to the cluster.
- Checkpoint: Implemented endpoint for reverting to previously sync'ed version of a flow and updated version control endpoint so that Process Group can be pushed as a new version to existing flow instead of only creating a new flow
- Checkpoint: Updated so that if a Process Group is under Version Control and it has a child Process Group, which is also under Version Control, we can handle that gracefully. Not yet tested because it depends on updates to the nifi-registry module, which can't be compiled due to maven dependency conflicts

106 files changed:
nifi-api/src/main/java/org/apache/nifi/components/VersionedComponent.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/pom.xml
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/AffectedComponentDTO.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ComponentDTO.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConnectableDTO.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessGroupDTO.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RemoteProcessGroupPortDTO.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionedFlowDTO.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionedFlowUpdateRequestDTO.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/AffectedComponentEntity.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionControlComponentMappingEntity.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionControlInformationEntity.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowEntity.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowSnapshotEntity.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowUpdateRequestEntity.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ControllerEndpointMerger.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/FlowConfigurationEndpointMerger.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/VersionControlInformationEndpointMerger.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/RequestReplicator.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ConnectionEntityMerger.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PortEntityMerger.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessGroupEntityMerger.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ProcessorEntityMerger.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/VersionControlInformationEntityMerger.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/FileBasedClusterNodeFirewallFactoryBean.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/replication/TestThreadPoolRequestReplicator.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/pom.xml
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connection.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/label/Label.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/UnknownResourceException.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/label/StandardLabel.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistryClient.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedConnectableComponent.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedComponent.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedConnection.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedControllerService.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedFunnel.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedLabel.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedPort.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessGroup.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessor.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedRemoteGroupPort.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedRemoteProcessGroup.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroupPortDescriptor.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/FlowControllerFactoryBean.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/nifi-context.xml
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/reporting/TestStandardReportingContext.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/serialization/StandardFlowSerializerTest.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacadeLock.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsynchronousWebRequest.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/RequestManager.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/AffectedComponentUtils.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/CancellableTimedPause.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ClusterReplicationComponentLifecycle.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/ComponentLifecycle.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LifecycleManagementException.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/util/LocalComponentLifecycle.java [new file with mode: 0644]
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/resources/nifi-web-api-context.xml
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java
pom.xml

diff --git a/nifi-api/src/main/java/org/apache/nifi/components/VersionedComponent.java b/nifi-api/src/main/java/org/apache/nifi/components/VersionedComponent.java
new file mode 100644 (file)
index 0000000..164a4f2
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components;
+
+import java.util.Optional;
+
+public interface VersionedComponent {
+
+    /**
+     * @return the unique identifier that maps this component to a component that is versioned
+     *         in a Flow Registry, or <code>Optional.empty</code> if this component has not been saved to a Flow Registry.
+     */
+    Optional<String> getVersionedComponentId();
+
+    /**
+     * Updates the versioned component identifier
+     *
+     * @param versionedComponentId the identifier of the versioned component
+     *
+     * @throws IllegalStateException if this component is already under version control with a different ID and
+     *             the given ID is not null
+     */
+    void setVersionedComponentId(String versionedComponentId);
+}
index b47b2bc..69bb4a0 100644 (file)
@@ -26,5 +26,9 @@
             <groupId>io.swagger</groupId>
             <artifactId>swagger-annotations</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi.registry</groupId>
+            <artifactId>nifi-registry-data-model</artifactId>
+        </dependency>
     </dependencies>
 </project>
index 3567b53..95024ca 100644 (file)
@@ -26,6 +26,8 @@ import java.util.Collection;
 public class AffectedComponentDTO {
     public static final String COMPONENT_TYPE_PROCESSOR = "PROCESSOR";
     public static final String COMPONENT_TYPE_CONTROLLER_SERVICE = "CONTROLLER_SERVICE";
+    public static final String COMPONENT_TYPE_REMOTE_INPUT_PORT = "REMOTE_INPUT_PORT";
+    public static final String COMPONENT_TYPE_REMOTE_OUTPUT_PORT = "REMOTE_OUTPUT_PORT";
 
     private String processGroupId;
     private String id;
@@ -54,7 +56,9 @@ public class AffectedComponentDTO {
         this.id = id;
     }
 
-    @ApiModelProperty(value = "The type of this component", allowableValues = COMPONENT_TYPE_PROCESSOR + "," + COMPONENT_TYPE_CONTROLLER_SERVICE)
+    @ApiModelProperty(value = "The type of this component",
+        allowableValues = COMPONENT_TYPE_PROCESSOR + "," + COMPONENT_TYPE_CONTROLLER_SERVICE + ", "
+            + COMPONENT_TYPE_REMOTE_INPUT_PORT + ", " + COMPONENT_TYPE_REMOTE_OUTPUT_PORT)
     public String getReferenceType() {
         return referenceType;
     }
@@ -73,21 +77,6 @@ public class AffectedComponentDTO {
     }
 
     /**
-     * @return scheduled state of the processor referencing a controller service. If this component is another service, this field represents the controller service state
-     */
-    @ApiModelProperty(
-            value = "The scheduled state of a processor or reporting task referencing a controller service. If this component is another controller "
-                    + "service, this field represents the controller service state."
-    )
-    public String getState() {
-        return state;
-    }
-
-    public void setState(String state) {
-        this.state = state;
-    }
-
-    /**
      * @return active thread count for the referencing component
      */
     @ApiModelProperty(
@@ -114,4 +103,14 @@ public class AffectedComponentDTO {
     public void setValidationErrors(Collection<String> validationErrors) {
         this.validationErrors = validationErrors;
     }
+
+    @ApiModelProperty("The scheduled state of a processor or reporting task referencing a controller service. If this component is another controller "
+        + "service, this field represents the controller service state.")
+    public String getState() {
+        return state;
+    }
+
+    public void setState(String state) {
+        this.state = state;
+    }
 }
index 2feefd7..81915ee 100644 (file)
@@ -27,6 +27,7 @@ import javax.xml.bind.annotation.XmlType;
 public class ComponentDTO {
 
     private String id;
+    private String versionedComponentId;
 
     private String parentGroupId;
     private PositionDTO position;
@@ -47,6 +48,15 @@ public class ComponentDTO {
         this.id = id;
     }
 
+    @ApiModelProperty("The ID of the corresponding component that is under version control")
+    public String getVersionedComponentId() {
+        return versionedComponentId;
+    }
+
+    public void setVersionedComponentId(final String id) {
+        this.versionedComponentId = id;
+    }
+
     /**
      * @return id for the parent group of this component if applicable, null otherwise
      */
index b820479..a63872e 100644 (file)
@@ -27,6 +27,7 @@ import javax.xml.bind.annotation.XmlType;
 public class ConnectableDTO {
 
     private String id;
+    private String versionedComponentId;
     private String type;
     private String groupId;
     private String name;
@@ -50,6 +51,16 @@ public class ConnectableDTO {
         this.id = id;
     }
 
+    @ApiModelProperty("The ID of the corresponding component that is under version control")
+    public String getVersionedComponentId() {
+        return versionedComponentId;
+    }
+
+    public void setVersionedComponentId(final String id) {
+        this.versionedComponentId = id;
+    }
+
+
     /**
      * @return type of this connectable component
      */
index c8e4a39..7faf10b 100644 (file)
@@ -30,6 +30,7 @@ public class ProcessGroupDTO extends ComponentDTO {
     private String name;
     private String comments;
     private Map<String, String> variables;
+    private VersionControlInformationDTO versionControlInfo;
 
     private Integer runningCount;
     private Integer stoppedCount;
@@ -203,7 +204,6 @@ public class ProcessGroupDTO extends ComponentDTO {
         this.inactiveRemotePortCount = inactiveRemotePortCount;
     }
 
-
     @ApiModelProperty(value = "The variables that are configured for the Process Group. Note that this map contains only "
         + "those variables that are defined on this Process Group and not any variables that are defined in the parent "
         + "Process Group, etc. I.e., this Map will not contain all variables that are accessible by components in this "
@@ -215,4 +215,14 @@ public class ProcessGroupDTO extends ComponentDTO {
     public void setVariables(final Map<String, String> variables) {
         this.variables = variables;
     }
+
+    @ApiModelProperty("The Version Control information that indicates which Flow Registry, and where in the Flow Registry, "
+        + "this Process Group is tracking to; or null if this Process Group is not under version control")
+    public VersionControlInformationDTO getVersionControlInformation() {
+        return versionControlInfo;
+    }
+
+    public void setVersionControlInformation(final VersionControlInformationDTO versionControlInfo) {
+        this.versionControlInfo = versionControlInfo;
+    }
 }
index 59c5631..8b0ddb4 100644 (file)
@@ -27,6 +27,7 @@ public class RemoteProcessGroupPortDTO {
 
     private String id;
     private String targetId;
+    private String versionedComponentId;
     private String groupId;
     private String name;
     private String comments;
@@ -52,6 +53,15 @@ public class RemoteProcessGroupPortDTO {
         this.comments = comments;
     }
 
+    @ApiModelProperty("The ID of the corresponding component that is under version control")
+    public String getVersionedComponentId() {
+        return versionedComponentId;
+    }
+
+    public void setVersionedComponentId(final String id) {
+        this.versionedComponentId = id;
+    }
+
     /**
      * @return number tasks that may transmit flow files to the target port concurrently
      */
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionControlInformationDTO.java
new file mode 100644 (file)
index 0000000..d27e830
--- /dev/null
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.api.dto;
+
+import io.swagger.annotations.ApiModelProperty;
+
+import javax.xml.bind.annotation.XmlType;
+
+@XmlType(name = "versionControlInformation")
+public class VersionControlInformationDTO {
+    private String groupId;
+    private String registryId;
+    private String bucketId;
+    private String flowId;
+    private Integer version;
+    private Boolean modified;
+    private Boolean current;
+
+    @ApiModelProperty("The ID of the Process Group that is under version control")
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
+    @ApiModelProperty("The ID of the registry that the flow is stored in")
+    public String getRegistryId() {
+        return registryId;
+    }
+
+    public void setRegistryId(final String registryId) {
+        this.registryId = registryId;
+    }
+
+    @ApiModelProperty("The ID of the bucket that the flow is stored in")
+    public String getBucketId() {
+        return bucketId;
+    }
+
+    public void setBucketId(final String bucketId) {
+        this.bucketId = bucketId;
+    }
+
+    @ApiModelProperty("The ID of the flow")
+    public String getFlowId() {
+        return flowId;
+    }
+
+    public void setFlowId(final String flowId) {
+        this.flowId = flowId;
+    }
+
+    @ApiModelProperty("The version of the flow")
+    public Integer getVersion() {
+        return version;
+    }
+
+    public void setVersion(final Integer version) {
+        this.version = version;
+    }
+
+    @ApiModelProperty(readOnly=true,
+        value = "Whether or not the flow has been modified since it was last synced to the Flow Registry. The value will be null if this information is not yet known.")
+    public Boolean getModified() {
+        return modified;
+    }
+
+    public void setModified(Boolean modified) {
+        this.modified = modified;
+    }
+
+    @ApiModelProperty(readOnly=true,
+        value = "Whether or not this is the most recent version of the flow in the Flow Registry. The value will be null if this information is not yet known.")
+    public Boolean getCurrent() {
+        return current;
+    }
+
+    public void setCurrent(Boolean current) {
+        this.current = current;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionedFlowDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionedFlowDTO.java
new file mode 100644 (file)
index 0000000..27a83e6
--- /dev/null
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.api.dto;
+
+import io.swagger.annotations.ApiModelProperty;
+
+import javax.xml.bind.annotation.XmlType;
+
+@XmlType(name = "versionedFlow")
+public class VersionedFlowDTO {
+    private String registryId = "default"; // placeholder for now.
+    private String bucketId;
+    private String flowId;
+    private String flowName;
+    private String description;
+
+    @ApiModelProperty("The ID of the registry that the flow is tracked to")
+    public String getRegistryId() {
+        return registryId;
+    }
+
+    public void setRegistryId(String registryId) {
+        this.registryId = registryId;
+    }
+
+    @ApiModelProperty("The ID of the bucket where the flow is stored")
+    public String getBucketId() {
+        return bucketId;
+    }
+
+    public void setBucketId(String bucketId) {
+        this.bucketId = bucketId;
+    }
+
+    @ApiModelProperty(value = "The ID of the flow")
+    public String getFlowId() {
+        return flowId;
+    }
+
+    public void setFlowId(String flowId) {
+        this.flowId = flowId;
+    }
+
+    @ApiModelProperty("The name of the flow")
+    public String getFlowName() {
+        return flowName;
+    }
+
+    public void setFlowName(String flowName) {
+        this.flowName = flowName;
+    }
+
+    @ApiModelProperty("A description of the flow")
+    public String getDescription() {
+        return description;
+    }
+
+    public void setDescription(String description) {
+        this.description = description;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionedFlowUpdateRequestDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/VersionedFlowUpdateRequestDTO.java
new file mode 100644 (file)
index 0000000..aa42bf6
--- /dev/null
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.api.dto;
+
+import io.swagger.annotations.ApiModelProperty;
+import org.apache.nifi.web.api.dto.util.TimestampAdapter;
+
+import javax.xml.bind.annotation.XmlType;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import java.util.Date;
+
+@XmlType(name = "versionedFlowUpdateRequest")
+public class VersionedFlowUpdateRequestDTO {
+    private String requestId;
+    private String processGroupId;
+    private String uri;
+    private Date lastUpdated;
+    private boolean complete = false;
+    private String failureReason;
+
+    @ApiModelProperty("The unique ID of the Process Group that the variable registry belongs to")
+    public String getProcessGroupId() {
+        return processGroupId;
+    }
+
+    public void setProcessGroupId(String processGroupId) {
+        this.processGroupId = processGroupId;
+    }
+
+    @ApiModelProperty(value = "The unique ID of this request.", readOnly = true)
+    public String getRequestId() {
+        return requestId;
+    }
+
+    public void setRequestId(String requestId) {
+        this.requestId = requestId;
+    }
+
+    @ApiModelProperty(value = "The URI for future requests to this drop request.", readOnly = true)
+    public String getUri() {
+        return uri;
+    }
+
+    public void setUri(String uri) {
+        this.uri = uri;
+    }
+
+    @XmlJavaTypeAdapter(TimestampAdapter.class)
+    @ApiModelProperty(value = "The last time this request was updated.", dataType = "string", readOnly = true)
+    public Date getLastUpdated() {
+        return lastUpdated;
+    }
+
+    public void setLastUpdated(Date lastUpdated) {
+        this.lastUpdated = lastUpdated;
+    }
+
+    @ApiModelProperty(value = "Whether or not this request has completed", readOnly = true)
+    public boolean isComplete() {
+        return complete;
+    }
+
+    public void setComplete(boolean complete) {
+        this.complete = complete;
+    }
+
+    @ApiModelProperty(value = "An explanation of why this request failed, or null if this request has not failed", readOnly = true)
+    public String getFailureReason() {
+        return failureReason;
+    }
+
+    public void setFailureReason(String reason) {
+        this.failureReason = reason;
+    }
+}
index 0f28f73..e0d8496 100644 (file)
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.nifi.web.api.entity;
 
 import org.apache.nifi.web.api.dto.AffectedComponentDTO;
@@ -33,12 +32,13 @@ public class AffectedComponentEntity extends ComponentEntity implements Permissi
     /**
      * @return variable referencing components that is being serialized
      */
+    @Override
     public AffectedComponentDTO getComponent() {
         return component;
     }
 
+    @Override
     public void setComponent(AffectedComponentDTO component) {
         this.component = component;
     }
-
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionControlComponentMappingEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionControlComponentMappingEntity.java
new file mode 100644 (file)
index 0000000..e1bd6b5
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.api.entity;
+
+import io.swagger.annotations.ApiModelProperty;
+import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Map;
+
+@XmlRootElement(name = "versionControlComponentMappingEntity")
+public class VersionControlComponentMappingEntity extends Entity {
+    private VersionControlInformationDTO versionControlDto;
+    private Map<String, String> versionControlComponentMapping;
+    private RevisionDTO processGroupRevision;
+
+    @ApiModelProperty("The Version Control information")
+    public VersionControlInformationDTO getVersionControlInformation() {
+        return versionControlDto;
+    }
+
+    public void setVersionControlInformation(VersionControlInformationDTO versionControlDto) {
+        this.versionControlDto = versionControlDto;
+    }
+
+    @ApiModelProperty("The mapping of Versioned Component Identifiers to instance ID's")
+    public Map<String, String> getVersionControlComponentMapping() {
+        return versionControlComponentMapping;
+    }
+
+    public void setVersionControlComponentMapping(Map<String, String> mapping) {
+        this.versionControlComponentMapping = mapping;
+    }
+
+    @ApiModelProperty("The revision of the Process Group")
+    public RevisionDTO getProcessGroupRevision() {
+        return processGroupRevision;
+    }
+
+    public void setProcessGroupRevision(RevisionDTO processGroupRevision) {
+        this.processGroupRevision = processGroupRevision;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionControlInformationEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionControlInformationEntity.java
new file mode 100644 (file)
index 0000000..e8ec81f
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.api.entity;
+
+import io.swagger.annotations.ApiModelProperty;
+import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement(name = "versionControlInformationEntity")
+public class VersionControlInformationEntity extends Entity {
+    private VersionControlInformationDTO versionControlDto;
+    private RevisionDTO processGroupRevision;
+
+    @ApiModelProperty("The Version Control information")
+    public VersionControlInformationDTO getVersionControlInformation() {
+        return versionControlDto;
+    }
+
+    public void setVersionControlInformation(VersionControlInformationDTO versionControlDto) {
+        this.versionControlDto = versionControlDto;
+    }
+
+    @ApiModelProperty("The Revision for the Process Group")
+    public RevisionDTO getProcessGroupRevision() {
+        return processGroupRevision;
+    }
+
+    public void setProcessGroupRevision(RevisionDTO revision) {
+        this.processGroupRevision = revision;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowEntity.java
new file mode 100644 (file)
index 0000000..b94255a
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.api.entity;
+
+import io.swagger.annotations.ApiModelProperty;
+import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.dto.VersionedFlowDTO;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement(name = "versionedFlow")
+public class VersionedFlowEntity extends Entity {
+    private VersionedFlowDTO versionedFlow;
+    private RevisionDTO processGroupRevision;
+
+    @ApiModelProperty("The versioned flow")
+    public VersionedFlowDTO getVersionedFlow() {
+        return versionedFlow;
+    }
+
+    public void setVersionedFlow(VersionedFlowDTO versionedFLow) {
+        this.versionedFlow = versionedFLow;
+    }
+
+    @ApiModelProperty("The Revision of the Process Group under Version Control")
+    public RevisionDTO getProcessGroupRevision() {
+        return processGroupRevision;
+    }
+
+    public void setProcessGroupRevision(final RevisionDTO revision) {
+        this.processGroupRevision = revision;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowSnapshotEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowSnapshotEntity.java
new file mode 100644 (file)
index 0000000..170640d
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.api.entity;
+
+import io.swagger.annotations.ApiModelProperty;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.web.api.dto.RevisionDTO;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement(name = "versionedFlowSnapshot")
+public class VersionedFlowSnapshotEntity extends Entity {
+    private VersionedFlowSnapshot versionedFlowSnapshot;
+    private RevisionDTO processGroupRevision;
+    private String registryId;
+
+    @ApiModelProperty("The versioned flow snapshot")
+    public VersionedFlowSnapshot getVersionedFlowSnapshot() {
+        return versionedFlowSnapshot;
+    }
+
+    public void setVersionedFlow(VersionedFlowSnapshot versionedFlowSnapshot) {
+        this.versionedFlowSnapshot = versionedFlowSnapshot;
+    }
+
+    @ApiModelProperty("The Revision of the Process Group under Version Control")
+    public RevisionDTO getProcessGroupRevision() {
+        return processGroupRevision;
+    }
+
+    public void setProcessGroupRevision(final RevisionDTO revision) {
+        this.processGroupRevision = revision;
+    }
+
+    @ApiModelProperty("The ID of the Registry that this flow belongs to")
+    public String getRegistryId() {
+        return registryId;
+    }
+
+    public void setRegistryId(String registryId) {
+        this.registryId = registryId;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowUpdateRequestEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/VersionedFlowUpdateRequestEntity.java
new file mode 100644 (file)
index 0000000..7211824
--- /dev/null
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.api.entity;
+
+import io.swagger.annotations.ApiModelProperty;
+import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.dto.VersionedFlowUpdateRequestDTO;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement(name = "versionedFlowUpdateRequestEntity")
+public class VersionedFlowUpdateRequestEntity extends Entity {
+    private VersionedFlowUpdateRequestDTO request;
+    private RevisionDTO processGroupRevision;
+
+    @ApiModelProperty("The revision for the Process Group that owns this variable registry.")
+    public RevisionDTO getProcessGroupRevision() {
+        return processGroupRevision;
+    }
+
+    public void setProcessGroupRevision(RevisionDTO revision) {
+        this.processGroupRevision = revision;
+    }
+
+    @ApiModelProperty("The Versioned Flow Update Request")
+    public VersionedFlowUpdateRequestDTO getRequest() {
+        return request;
+    }
+
+    public void setRequest(VersionedFlowUpdateRequestDTO request) {
+        this.request = request;
+    }
+
+}
index 6e38860..804d59d 100644 (file)
@@ -33,7 +33,6 @@ import java.util.regex.Pattern;
 
 public class ControllerEndpointMerger extends AbstractSingleDTOEndpoint<ControllerEntity, ControllerDTO> {
     public static final Pattern CONTROLLER_URI_PATTERN = Pattern.compile("/nifi-api/site-to-site");
-    private PortEntityMerger portMerger = new PortEntityMerger();
 
     @Override
     protected Class<ControllerEntity> getEntityClass() {
@@ -47,7 +46,6 @@ public class ControllerEndpointMerger extends AbstractSingleDTOEndpoint<Controll
 
     @Override
     protected void mergeResponses(ControllerDTO clientDto, Map<NodeIdentifier, ControllerDTO> dtoMap, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses) {
-        ControllerDTO mergedController = clientDto;
         final Map<String, Map<NodeIdentifier, PortDTO>> inputPortMap = new HashMap<>(); // map of port id to map of node id to port dto
         final Map<String, Map<NodeIdentifier, PortDTO>> outputPortMap = new HashMap<>(); // map of port id to map of node id to port dto
 
index 22fa684..6ba2859 100644 (file)
@@ -47,7 +47,6 @@ public class FlowConfigurationEndpointMerger extends AbstractNodeStatusEndpoint<
     protected void mergeResponses(FlowConfigurationDTO clientDto, Map<NodeIdentifier, FlowConfigurationDTO> dtoMap, NodeIdentifier selectedNodeId) {
 
         for (final Map.Entry<NodeIdentifier, FlowConfigurationDTO> entry : dtoMap.entrySet()) {
-            final NodeIdentifier nodeId = entry.getKey();
             final FlowConfigurationDTO toMerge = entry.getValue();
             if (toMerge != clientDto) {
                 clientDto.setSupportsConfigurableAuthorizer(clientDto.getSupportsConfigurableAuthorizer() && toMerge.getSupportsConfigurableAuthorizer());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/VersionControlInformationEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/VersionControlInformationEndpointMerger.java
new file mode 100644 (file)
index 0000000..14e3bf6
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.VersionControlInformationEntityMerger;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
+
+public class VersionControlInformationEndpointMerger extends AbstractSingleEntityEndpoint<VersionControlInformationEntity> implements EndpointResponseMerger {
+    public static final Pattern VERSION_CONTROL_URI_PATTERN = Pattern.compile("/nifi-api/versions/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))");
+    private final VersionControlInformationEntityMerger versionControlInfoEntityMerger = new VersionControlInformationEntityMerger();
+
+    @Override
+    public boolean canHandle(final URI uri, final String method) {
+        if (("GET".equalsIgnoreCase(method) || "POST".equalsIgnoreCase(method)) && (VERSION_CONTROL_URI_PATTERN.matcher(uri.getPath()).matches())) {
+            return true;
+        }
+
+        return false;
+    }
+
+    @Override
+    protected Class<VersionControlInformationEntity> getEntityClass() {
+        return VersionControlInformationEntity.class;
+    }
+
+    @Override
+    protected void mergeResponses(final VersionControlInformationEntity clientEntity, final Map<NodeIdentifier, VersionControlInformationEntity> entityMap,
+        final Set<NodeResponse> successfulResponses, final Set<NodeResponse> problematicResponses) {
+
+        versionControlInfoEntityMerger.merge(clientEntity, entityMap);
+    }
+
+}
index 1fd6a49..a7177d4 100644 (file)
@@ -21,6 +21,7 @@ import java.net.URI;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 
 public interface RequestReplicator {
@@ -71,7 +72,6 @@ public interface RequestReplicator {
      */
     void shutdown();
 
-
     /**
      * Replicates a request to each node in the cluster. If the request attempts to modify the flow and there is a node
      * that is not currently connected, an Exception will be thrown. Otherwise, the returned AsyncClusterResponse object
@@ -89,24 +89,64 @@ public interface RequestReplicator {
     AsyncClusterResponse replicate(String method, URI uri, Object entity, Map<String, String> headers);
 
     /**
+     * Replicates a request to each node in the cluster. If the request attempts to modify the flow and there is a node
+     * that is not currently connected, an Exception will be thrown. Otherwise, the returned AsyncClusterResponse object
+     * will contain the results that are immediately available, as well as an identifier for obtaining an updated result
+     * later. NOTE: This method will ALWAYS indicate that the request has been replicated.
+     *
+     * @param user the user making the request
+     * @param method the HTTP method (e.g., POST, PUT)
+     * @param uri the base request URI (up to, but not including, the query string)
+     * @param entity an entity
+     * @param headers any HTTP headers
+     * @return an AsyncClusterResponse that indicates the current status of the request and provides an identifier for obtaining an updated response later
+     * @throws ConnectingNodeMutableRequestException if the request attempts to modify the flow and there is a node that is in the CONNECTING state
+     * @throws DisconnectedNodeMutableRequestException if the request attempts to modify the flow and there is a node that is in the DISCONNECTED state
+     */
+    AsyncClusterResponse replicate(NiFiUser user, String method, URI uri, Object entity, Map<String, String> headers);
+
+    /**
+     * Requests are sent to each node in the given set of Node Identifiers. The returned AsyncClusterResponse object will contain
+     * the results that are immediately available, as well as an identifier for obtaining an updated result later.
+     * <p>
+     * HTTP DELETE, GET, HEAD, and OPTIONS methods will throw an IllegalArgumentException if used.
+     *
+     * @param nodeIds the node identifiers
+     * @param user the user making the request
+     * @param method the HTTP method (e.g., POST, PUT)
+     * @param uri the base request URI (up to, but not including, the query string)
+     * @param entity an entity
+     * @param headers any HTTP headers
+     * @param indicateReplicated if <code>true</code>, will add a header indicating to the receiving nodes that the request
+     *            has already been replicated, so the receiving node will not replicate the request itself.
+     * @param performVerification if <code>true</code>, and the request is mutable, will verify that all nodes are connected before
+     *            making the request and that all nodes are able to perform the request before acutally attempting to perform the task.
+     *            If false, will perform no such verification
+     * @return an AsyncClusterResponse that indicates the current status of the request and provides an identifier for obtaining an updated response later
+     */
+    AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, NiFiUser user, String method, URI uri, Object entity, Map<String, String> headers, boolean indicateReplicated,
+        boolean performVerification);
+
+    /**
      * Requests are sent to each node in the given set of Node Identifiers. The returned AsyncClusterResponse object will contain
      * the results that are immediately available, as well as an identifier for obtaining an updated result later.
      * <p>
      * HTTP DELETE, GET, HEAD, and OPTIONS methods will throw an IllegalArgumentException if used.
      *
-     * @param nodeIds             the node identifiers
-     * @param method              the HTTP method (e.g., POST, PUT)
-     * @param uri                 the base request URI (up to, but not including, the query string)
-     * @param entity              an entity
-     * @param headers             any HTTP headers
-     * @param indicateReplicated  if <code>true</code>, will add a header indicating to the receiving nodes that the request
-     *                            has already been replicated, so the receiving node will not replicate the request itself.
+     * @param nodeIds the node identifiers
+     * @param method the HTTP method (e.g., POST, PUT)
+     * @param uri the base request URI (up to, but not including, the query string)
+     * @param entity an entity
+     * @param headers any HTTP headers
+     * @param indicateReplicated if <code>true</code>, will add a header indicating to the receiving nodes that the request
+     *            has already been replicated, so the receiving node will not replicate the request itself.
      * @param performVerification if <code>true</code>, and the request is mutable, will verify that all nodes are connected before
-     *                            making the request and that all nodes are able to perform the request before acutally attempting to perform the task.
-     *                            If false, will perform no such verification
+     *            making the request and that all nodes are able to perform the request before acutally attempting to perform the task.
+     *            If false, will perform no such verification
      * @return an AsyncClusterResponse that indicates the current status of the request and provides an identifier for obtaining an updated response later
      */
-    AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean indicateReplicated, boolean performVerification);
+    AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean indicateReplicated,
+        boolean performVerification);
 
 
     /**
@@ -122,6 +162,19 @@ public interface RequestReplicator {
     AsyncClusterResponse forwardToCoordinator(NodeIdentifier coordinatorNodeId, String method, URI uri, Object entity, Map<String, String> headers);
 
     /**
+     * Forwards a request to the Cluster Coordinator so that it is able to replicate the request to all nodes in the cluster.
+     *
+     * @param coordinatorNodeId the node identifier of the Cluster Coordinator
+     * @param user the user making the request
+     * @param method the HTTP method (e.g., POST, PUT)
+     * @param uri the base request URI (up to, but not including, the query string)
+     * @param entity an entity
+     * @param headers any HTTP headers
+     * @return an AsyncClusterResponse that indicates the current status of the request and provides an identifier for obtaining an updated response later
+     */
+    AsyncClusterResponse forwardToCoordinator(NodeIdentifier coordinatorNodeId, NiFiUser user, String method, URI uri, Object entity, Map<String, String> headers);
+
+    /**
      * <p>
      * Returns an AsyncClusterResponse that provides the most up-to-date status of the request with the given identifier.
      * If the request is finished, meaning that all nodes in the cluster have reported back their status or have timed out,
@@ -132,7 +185,7 @@ public interface RequestReplicator {
      *
      * @param requestIdentifier the identifier of the request to obtain a response for
      * @return an AsyncClusterResponse that provides the most up-to-date status of the request with the given identifier, or <code>null</code> if
-     * no request exists with the given identifier
+     *         no request exists with the given identifier
      */
     AsyncClusterResponse getClusterResponse(String requestIdentifier);
 }
index bd7729c..bd1e4b3 100644 (file)
@@ -195,8 +195,14 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
         maintenanceExecutor.shutdown();
     }
 
+
     @Override
     public AsyncClusterResponse replicate(String method, URI uri, Object entity, Map<String, String> headers) {
+        return replicate(NiFiUserUtils.getNiFiUser(), method, uri, entity, headers);
+    }
+
+    @Override
+    public AsyncClusterResponse replicate(NiFiUser user, String method, URI uri, Object entity, Map<String, String> headers) {
         final Map<NodeConnectionState, List<NodeIdentifier>> stateMap = clusterCoordinator.getConnectionStates();
         final boolean mutable = isMutableRequest(method, uri.getPath());
 
@@ -237,11 +243,10 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
 
         final Set<NodeIdentifier> nodeIdSet = new HashSet<>(nodeIds);
 
-        return replicate(nodeIdSet, method, uri, entity, headers, true, true);
+        return replicate(nodeIdSet, user, method, uri, entity, headers, true, true);
     }
 
-    void updateRequestHeaders(final Map<String, String> headers) {
-        final NiFiUser user = NiFiUserUtils.getNiFiUser();
+    void updateRequestHeaders(final Map<String, String> headers, final NiFiUser user) {
         if (user == null) {
             throw new AccessDeniedException("Unknown user");
         }
@@ -279,6 +284,13 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
 
     @Override
     public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers,
+                final boolean indicateReplicated, final boolean performVerification) {
+
+        return replicate(nodeIds, NiFiUserUtils.getNiFiUser(), method, uri, entity, headers, indicateReplicated, performVerification);
+    }
+
+    @Override
+    public AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, final NiFiUser user, String method, URI uri, Object entity, Map<String, String> headers,
                                           final boolean indicateReplicated, final boolean performVerification) {
         final Map<String, String> updatedHeaders = new HashMap<>(headers);
 
@@ -288,7 +300,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
         }
 
         // include the proxied entities header
-        updateRequestHeaders(updatedHeaders);
+        updateRequestHeaders(updatedHeaders, user);
 
         if (indicateReplicated) {
             // If we are replicating a request and indicating that it is replicated, then this means that we are
@@ -324,12 +336,19 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
         }
     }
 
+
     @Override
     public AsyncClusterResponse forwardToCoordinator(final NodeIdentifier coordinatorNodeId, final String method, final URI uri, final Object entity, final Map<String, String> headers) {
+        return forwardToCoordinator(coordinatorNodeId, NiFiUserUtils.getNiFiUser(), method, uri, entity, headers);
+    }
+
+    @Override
+    public AsyncClusterResponse forwardToCoordinator(final NodeIdentifier coordinatorNodeId, final NiFiUser user, final String method,
+                final URI uri, final Object entity, final Map<String, String> headers) {
         final Map<String, String> updatedHeaders = new HashMap<>(headers);
 
         // include the proxied entities header
-        updateRequestHeaders(updatedHeaders);
+        updateRequestHeaders(updatedHeaders, user);
 
         return replicate(Collections.singleton(coordinatorNodeId), method, uri, entity, updatedHeaders, false, null, false, false, null);
     }
index 89ac179..7e3bc5d 100644 (file)
@@ -28,7 +28,6 @@ public class ConnectionEntityMerger implements ComponentEntityMerger<ConnectionE
     public void merge(ConnectionEntity clientEntity, Map<NodeIdentifier, ConnectionEntity> entityMap) {
         ComponentEntityMerger.super.merge(clientEntity, entityMap);
         for (Map.Entry<NodeIdentifier, ConnectionEntity> entry : entityMap.entrySet()) {
-            final NodeIdentifier nodeId = entry.getKey();
             final ConnectionEntity entityStatus = entry.getValue();
             if (entityStatus != clientEntity) {
                 mergeStatus(clientEntity.getStatus(), clientEntity.getPermissions().getCanRead(), entry.getValue().getStatus(), entry.getValue().getPermissions().getCanRead(), entry.getKey());
index 2929741..3df3c16 100644 (file)
@@ -31,7 +31,6 @@ public class PortEntityMerger implements ComponentEntityMerger<PortEntity>, Comp
     public void merge(PortEntity clientEntity, Map<NodeIdentifier, PortEntity> entityMap) {
         ComponentEntityMerger.super.merge(clientEntity, entityMap);
         for (Map.Entry<NodeIdentifier, PortEntity> entry : entityMap.entrySet()) {
-            final NodeIdentifier nodeId = entry.getKey();
             final PortEntity entityStatus = entry.getValue();
             if (entityStatus != clientEntity) {
                 mergeStatus(clientEntity.getStatus(), clientEntity.getPermissions().getCanRead(), entry.getValue().getStatus(), entry.getValue().getPermissions().getCanRead(), entry.getKey());
index 67278a7..457e75b 100644 (file)
@@ -17,6 +17,8 @@
 package org.apache.nifi.cluster.manager;
 
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.ProcessGroupDTO;
+import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
 import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
 import org.apache.nifi.web.api.entity.ProcessGroupEntity;
 
@@ -31,6 +33,7 @@ public class ProcessGroupEntityMerger implements ComponentEntityMerger<ProcessGr
             final ProcessGroupEntity entityStatus = entry.getValue();
             if (entityStatus != clientEntity) {
                 mergeStatus(clientEntity.getStatus(), clientEntity.getPermissions().getCanRead(), entry.getValue().getStatus(), entry.getValue().getPermissions().getCanRead(), entry.getKey());
+                mergeVersionControlInformation(clientEntity, entityStatus);
             }
         }
     }
@@ -41,4 +44,18 @@ public class ProcessGroupEntityMerger implements ComponentEntityMerger<ProcessGr
         StatusMerger.merge(clientStatus, clientStatusReadablePermission, status, statusReadablePermission, statusNodeIdentifier.getId(), statusNodeIdentifier.getApiAddress(),
                 statusNodeIdentifier.getApiPort());
     }
+
+    private void mergeVersionControlInformation(ProcessGroupEntity targetGroup, ProcessGroupEntity toMerge) {
+        final ProcessGroupDTO targetGroupDto = targetGroup.getComponent();
+        final ProcessGroupDTO toMergeGroupDto = toMerge.getComponent();
+
+        final VersionControlInformationDTO targetVersionControl = targetGroupDto.getVersionControlInformation();
+        final VersionControlInformationDTO toMergeVersionControl = toMergeGroupDto.getVersionControlInformation();
+
+        if (targetVersionControl == null) {
+            targetGroupDto.setVersionControlInformation(toMergeGroupDto.getVersionControlInformation());
+        } else if (toMergeVersionControl != null) {
+            targetVersionControl.setCurrent(Boolean.TRUE.equals(targetVersionControl.getCurrent()) && Boolean.TRUE.equals(toMergeVersionControl.getCurrent()));
+        }
+    }
 }
index 5c419e9..dffac49 100644 (file)
@@ -32,7 +32,6 @@ public class ProcessorEntityMerger implements ComponentEntityMerger<ProcessorEnt
     public void merge(ProcessorEntity clientEntity, Map<NodeIdentifier, ProcessorEntity> entityMap) {
         ComponentEntityMerger.super.merge(clientEntity, entityMap);
         for (Map.Entry<NodeIdentifier, ProcessorEntity> entry : entityMap.entrySet()) {
-            final NodeIdentifier nodeId = entry.getKey();
             final ProcessorEntity entityStatus = entry.getValue();
             if (entityStatus != clientEntity) {
                 mergeStatus(clientEntity.getStatus(), clientEntity.getPermissions().getCanRead(), entry.getValue().getStatus(), entry.getValue().getPermissions().getCanRead(), entry.getKey());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/VersionControlInformationEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/VersionControlInformationEntityMerger.java
new file mode 100644 (file)
index 0000000..8d102df
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.manager;
+
+import java.util.Map;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
+import org.apache.nifi.web.api.entity.VersionControlInformationEntity;
+
+public class VersionControlInformationEntityMerger {
+
+    public void merge(final VersionControlInformationEntity clientEntity, final Map<NodeIdentifier, VersionControlInformationEntity> entityMap) {
+
+        final VersionControlInformationDTO clientDto = clientEntity.getVersionControlInformation();
+
+        // We need to merge the 'current' and 'modified' flags because these are updated by nodes in the background. Since
+        // the nodes can synchronize with the Flow Registry at different intervals, we have to determine how to handle these
+        // flags if different nodes report different values for them.
+        entityMap.values().stream()
+            .filter(entity -> entity != clientEntity)
+            .forEach(entity -> {
+                final VersionControlInformationDTO dto = entity.getVersionControlInformation();
+
+                // We consider the flow to be current only if ALL nodes indicate that it is current
+                clientDto.setCurrent(Boolean.TRUE.equals(clientDto.getCurrent()) && Boolean.TRUE.equals(dto.getCurrent()));
+
+                // We consider the flow to be modified if ANY node indicates that it is modified
+                clientDto.setModified(Boolean.TRUE.equals(clientDto.getModified()) || Boolean.TRUE.equals(dto.getModified()));
+            });
+    }
+
+}
index a86fc79..3e76de6 100644 (file)
@@ -25,14 +25,14 @@ import org.springframework.beans.factory.FactoryBean;
 /**
  * Factory bean for creating a singleton FileBasedClusterNodeFirewall instance.
  */
-public class FileBasedClusterNodeFirewallFactoryBean implements FactoryBean {
+public class FileBasedClusterNodeFirewallFactoryBean implements FactoryBean<FileBasedClusterNodeFirewall> {
 
     private FileBasedClusterNodeFirewall firewall;
 
     private NiFiProperties properties;
 
     @Override
-    public Object getObject() throws Exception {
+    public FileBasedClusterNodeFirewall getObject() throws Exception {
         if (firewall == null) {
             final File config = properties.getClusterNodeFirewallFile();
             final File restoreDirectory = properties.getRestoreDirectory();
@@ -44,7 +44,7 @@ public class FileBasedClusterNodeFirewallFactoryBean implements FactoryBean {
     }
 
     @Override
-    public Class getObjectType() {
+    public Class<FileBasedClusterNodeFirewall> getObjectType() {
         return FileBasedClusterNodeFirewall.class;
     }
 
index 836751c..1f0ceb5 100644 (file)
@@ -19,6 +19,7 @@ package org.apache.nifi.cluster.coordination.http.replication;
 import org.apache.commons.collections4.map.MultiValueMap;
 import org.apache.nifi.authorization.user.NiFiUser;
 import org.apache.nifi.authorization.user.NiFiUserDetails;
+import org.apache.nifi.authorization.user.NiFiUserUtils;
 import org.apache.nifi.authorization.user.StandardNiFiUser;
 import org.apache.nifi.authorization.user.StandardNiFiUser.Builder;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
@@ -443,7 +444,7 @@ public class TestThreadPoolRequestReplicator {
 
                 // ensure the proxied entities header is set
                 final Map<String, String> updatedHeaders = new HashMap<>();
-                replicator.updateRequestHeaders(updatedHeaders);
+                replicator.updateRequestHeaders(updatedHeaders, NiFiUserUtils.getNiFiUser());
 
                 // Pass in Collections.emptySet() for the node ID's so that an Exception is thrown
                 replicator.replicate(Collections.emptySet(), "GET", new URI("localhost:8080/nifi"), Collections.emptyMap(),
@@ -501,7 +502,7 @@ public class TestThreadPoolRequestReplicator {
 
             // ensure the proxied entities header is set
             final Map<String, String> updatedHeaders = new HashMap<>();
-            replicator.updateRequestHeaders(updatedHeaders);
+            replicator.updateRequestHeaders(updatedHeaders, NiFiUserUtils.getNiFiUser());
 
             replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, updatedHeaders, true, null, true, true, monitor);
 
@@ -554,7 +555,7 @@ public class TestThreadPoolRequestReplicator {
 
             // ensure the proxied entities header is set
             final Map<String, String> updatedHeaders = new HashMap<>();
-            replicator.updateRequestHeaders(updatedHeaders);
+            replicator.updateRequestHeaders(updatedHeaders, NiFiUserUtils.getNiFiUser());
 
             replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, updatedHeaders, true, null, true, true, monitor);
 
index cace715..44d4905 100644 (file)
@@ -66,6 +66,7 @@ import org.apache.nifi.io.socket.SocketConfiguration;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.SystemBundle;
 import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
 import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.NiFiProperties;
@@ -147,7 +148,7 @@ public class Node {
         final HeartbeatMonitor heartbeatMonitor = createHeartbeatMonitor();
         flowController = FlowController.createClusteredInstance(Mockito.mock(FlowFileEventRepository.class), nodeProperties,
             null, null, StringEncryptor.createEncryptor(nodeProperties), protocolSender, Mockito.mock(BulletinRepository.class), clusterCoordinator,
-            heartbeatMonitor, electionManager, VariableRegistry.EMPTY_REGISTRY);
+            heartbeatMonitor, electionManager, VariableRegistry.EMPTY_REGISTRY, Mockito.mock(FlowRegistryClient.class));
 
         try {
             flowController.initializeFlow();
index 0cf5906..d1bce36 100644 (file)
@@ -58,5 +58,9 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-framework-authorization</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi.registry</groupId>
+            <artifactId>nifi-registry-data-model</artifactId>
+        </dependency>
     </dependencies>
 </project>
index 94e0745..a91ce97 100644 (file)
@@ -18,6 +18,7 @@ package org.apache.nifi.connectable;
 
 import org.apache.nifi.authorization.resource.ComponentAuthorizable;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.VersionedComponent;
 import org.apache.nifi.controller.Triggerable;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.processor.ProcessSession;
@@ -32,11 +33,12 @@ import java.util.concurrent.TimeUnit;
 /**
  * Represents a connectable component to which or from which data can flow.
  */
-public interface Connectable extends Triggerable, ComponentAuthorizable, Positionable {
+public interface Connectable extends Triggerable, ComponentAuthorizable, Positionable, VersionedComponent {
 
     /**
      * @return the unique identifier for this <code>Connectable</code>
      */
+    @Override
     String getIdentifier();
 
     /**
index acdcec6..423f52d 100644 (file)
@@ -17,6 +17,7 @@
 package org.apache.nifi.connectable;
 
 import org.apache.nifi.authorization.resource.Authorizable;
+import org.apache.nifi.components.VersionedComponent;
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.groups.ProcessGroup;
@@ -27,7 +28,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 
-public interface Connection extends Authorizable {
+public interface Connection extends Authorizable, VersionedComponent {
 
     void enqueue(FlowFileRecord flowFile);
 
index 7190fd4..0240648 100644 (file)
@@ -42,6 +42,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -81,6 +82,7 @@ public abstract class AbstractPort implements Port {
     private final AtomicReference<String> penalizationPeriod;
     private final AtomicReference<String> yieldPeriod;
     private final AtomicReference<String> schedulingPeriod;
+    private final AtomicReference<String> versionedComponentId = new AtomicReference<>();
     private final AtomicLong schedulingNanos;
     private final AtomicLong yieldExpiration;
     private final ProcessScheduler processScheduler;
@@ -635,4 +637,27 @@ public abstract class AbstractPort implements Port {
     @Override
     public void verifyCanClearState() {
     }
+
+    @Override
+    public Optional<String> getVersionedComponentId() {
+        return Optional.ofNullable(versionedComponentId.get());
+    }
+
+    @Override
+    public void setVersionedComponentId(final String versionedComponentId) {
+        boolean updated = false;
+        while (!updated) {
+            final String currentId = this.versionedComponentId.get();
+
+            if (currentId == null) {
+                updated = this.versionedComponentId.compareAndSet(null, versionedComponentId);
+            } else if (currentId.equals(versionedComponentId)) {
+                return;
+            } else if (versionedComponentId == null) {
+                updated = this.versionedComponentId.compareAndSet(currentId, null);
+            } else {
+                throw new IllegalStateException(this + " is already under version control");
+            }
+        }
+    }
 }
index 34ffbac..4b3507c 100644 (file)
@@ -43,6 +43,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -77,6 +78,7 @@ public class StandardFunnel implements Funnel {
     private final AtomicBoolean lossTolerant;
     private final AtomicReference<ScheduledState> scheduledState;
     private final AtomicLong yieldExpiration;
+    private final AtomicReference<String> versionedComponentId = new AtomicReference<>();
 
     private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
     private final Lock readLock = rwLock.readLock();
@@ -557,4 +559,27 @@ public class StandardFunnel implements Funnel {
     public String getComponentType() {
         return "Funnel";
     }
+
+    @Override
+    public Optional<String> getVersionedComponentId() {
+        return Optional.ofNullable(versionedComponentId.get());
+    }
+
+    @Override
+    public void setVersionedComponentId(final String versionedComponentId) {
+        boolean updated = false;
+        while (!updated) {
+            final String currentId = this.versionedComponentId.get();
+
+            if (currentId == null) {
+                updated = this.versionedComponentId.compareAndSet(null, versionedComponentId);
+            } else if (currentId.equals(versionedComponentId)) {
+                return;
+            } else if (versionedComponentId == null) {
+                updated = this.versionedComponentId.compareAndSet(currentId, null);
+            } else {
+                throw new IllegalStateException(this + " is already under version control");
+            }
+        }
+    }
 }
index bc1be00..d463725 100644 (file)
 package org.apache.nifi.controller.label;
 
 import org.apache.nifi.authorization.resource.ComponentAuthorizable;
+import org.apache.nifi.components.VersionedComponent;
 import org.apache.nifi.connectable.Positionable;
 import org.apache.nifi.connectable.Size;
 import org.apache.nifi.groups.ProcessGroup;
 
 import java.util.Map;
 
-public interface Label extends ComponentAuthorizable, Positionable {
+public interface Label extends ComponentAuthorizable, Positionable, VersionedComponent {
 
+    @Override
     String getIdentifier();
 
     Map<String, String> getStyle();
index 3dd1076..2f28963 100644 (file)
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.controller.service;
 
+import org.apache.nifi.components.VersionedComponent;
 import org.apache.nifi.controller.ConfiguredComponent;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.LoggableComponent;
@@ -26,7 +27,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 
-public interface ControllerServiceNode extends ConfiguredComponent {
+public interface ControllerServiceNode extends ConfiguredComponent, VersionedComponent {
 
     /**
      * @return the Process Group that this Controller Service belongs to, or <code>null</code> if the Controller Service
index 0baba23..8934788 100644 (file)
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.groups;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -24,6 +25,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.function.Predicate;
 
 import org.apache.nifi.authorization.resource.ComponentAuthorizable;
+import org.apache.nifi.components.VersionedComponent;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.connectable.Funnel;
@@ -39,6 +41,10 @@ import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.registry.ComponentVariableRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.UnknownResourceException;
+import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
 import org.apache.nifi.remote.RemoteGroupPort;
 
 /**
@@ -50,7 +56,7 @@ import org.apache.nifi.remote.RemoteGroupPort;
  * <p>
  * MUST BE THREAD-SAFE</p>
  */
-public interface ProcessGroup extends ComponentAuthorizable, Positionable {
+public interface ProcessGroup extends ComponentAuthorizable, Positionable, VersionedComponent {
 
     /**
      * Predicate for filtering schedulable Processors.
@@ -772,6 +778,17 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable {
     void move(final Snippet snippet, final ProcessGroup destination);
 
     /**
+     * Updates the Process Group to match the proposed flow
+     *
+     * @param proposedSnapshot the proposed flow
+     * @param componentIdSeed a seed value to use when generating ID's for new components
+     * @param verifyNotDirty whether or not to verify that the Process Group is not 'dirty'. If this value is <code>true</code>,
+     *            and the Process Group has been modified since it was last synchronized with the Flow Registry, then this method will
+     *            throw an IllegalStateException
+     */
+    void updateFlow(VersionedFlowSnapshot proposedSnapshot, String componentIdSeed, boolean verifyNotDirty);
+
+    /**
      * Verifies a template with the specified name can be created.
      *
      * @param name name of the template
@@ -832,6 +849,18 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable {
     void verifyCanUpdateVariables(Map<String, String> updatedVariables);
 
     /**
+     * Ensure that the contents of the Process Group can be update to match the given new flow
+     *
+     * @param updatedFlow the updated version of the flow
+     * @param verifyConnectionRemoval whether or not to verify that connections that are not present in the updated flow can be removed
+     * @param verifyNotDirty whether or not to verify that the Process Group is not 'dirty'. If <code>true</code> and the Process Group has been changed since
+     *            it was last synchronized with the FlowRegistry, then this method will throw an IllegalStateException
+     *
+     * @throws IllegalStateException if the Process Group is not in a state that will allow the update
+     */
+    void verifyCanUpdate(VersionedFlowSnapshot updatedFlow, boolean verifyConnectionRemoval, boolean verifyNotDirty);
+
+    /**
      * Adds the given template to this Process Group
      *
      * @param template the template to add
@@ -894,4 +923,30 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable {
      * @return a set of all components that are affected by the variable with the given name
      */
     Set<ConfiguredComponent> getComponentsAffectedByVariable(String variableName);
+
+    /**
+     * @return the version control information that indicates where this flow is stored in a Flow Registry,
+     *         or <code>null</code> if this Process Group is not under version control.
+     */
+    VersionControlInformation getVersionControlInformation();
+
+    /**
+     * Updates the Version Control Information for this Process Group
+     *
+     * @param versionControlInformation specification of where the flow is tracked in Version Control
+     * @param versionedComponentIds a mapping of component ID's to Versioned Component ID's. This is used to update the components in the
+     *            Process Group so that the components that exist in the Process Group can be associated with the corresponding components in the
+     *            Version Controlled flow
+     */
+    void setVersionControlInformation(VersionControlInformation versionControlInformation, Map<String, String> versionedComponentIds);
+
+    /**
+     * Synchronizes the Process Group with the given Flow Registry, determining whether or not the local flow
+     * is up to date with the newest version of the flow in the Registry and whether or not the local flow has been
+     * modified since it was last synced with the Flow Registry. If this Process Group is not under Version Control,
+     * this method will have no effect.
+     *
+     * @param flowRegistry the Flow Registry to synchronize with
+     */
+    void synchronizeWithFlowRegistry(FlowRegistryClient flowRegistry);
 }
index 79b9509..e4da31b 100644 (file)
@@ -18,6 +18,7 @@ package org.apache.nifi.groups;
 
 import org.apache.nifi.authorization.resource.ComponentAuthorizable;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.VersionedComponent;
 import org.apache.nifi.connectable.Positionable;
 import org.apache.nifi.controller.exception.CommunicationsException;
 import org.apache.nifi.events.EventReporter;
@@ -30,7 +31,7 @@ import java.util.Date;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable {
+public interface RemoteProcessGroup extends ComponentAuthorizable, Positionable, VersionedComponent {
 
     @Override
     String getIdentifier();
index b797749..2f9a9fd 100644 (file)
@@ -40,6 +40,11 @@ public interface RemoteProcessGroupPortDescriptor {
     String getTargetId();
 
     /**
+     * @return the ID corresponding to the component that is under version control
+     */
+    String getVersionedComponentId();
+
+    /**
      * @return id of the remote process group that this port resides in
      */
     String getGroupId();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
new file mode 100644 (file)
index 0000000..a5bb738
--- /dev/null
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow;
+
+import java.io.IOException;
+
+public interface FlowRegistry {
+
+    /**
+     * @return the URL of the Flow Registry
+     */
+    String getURL();
+
+    /**
+     * Registers the given Versioned Flow with the Flow Registry
+     *
+     * @param flow the Versioned Flow to add to the registry
+     * @return the fully populated VersionedFlow
+     *
+     * @throws NullPointerException if the VersionedFlow is null, or if its bucket identifier or name is null
+     * @throws UnknownResourceException if the bucket id does not exist
+     */
+    VersionedFlow registerVersionedFlow(VersionedFlow flow) throws IOException, UnknownResourceException;
+
+    /**
+     * Adds the given snapshot to the Flow Registry for the given flow
+     *
+     * @param flow the Versioned Flow
+     * @param snapshot the snapshot of the flow
+     * @param comments any comments for the snapshot
+     * @return the versioned flow snapshot
+     *
+     * @throws IOException if unable to communicate with the registry
+     * @throws NullPointerException if the VersionedFlow is null, or if its bucket identifier is null, or if the flow to snapshot is null
+     * @throws UnknownResourceException if the flow does not exist
+     */
+    VersionedFlowSnapshot registerVersionedFlowSnapshot(VersionedFlow flow, VersionedProcessGroup snapshot, String comments) throws IOException, UnknownResourceException;
+
+    /**
+     * Returns the latest (most recent) version of the Flow in the Flow Registry for the given bucket and flow
+     *
+     * @param bucketId the ID of the bucket
+     * @param flowId the ID of the flow
+     * @return the latest version of the Flow
+     *
+     * @throws IOException if unable to communicate with the Flow Registry
+     * @throws UnknownResourceException if unable to find the bucket with the given ID or the flow with the given ID
+     */
+    int getLatestVersion(String bucketId, String flowId) throws IOException, UnknownResourceException;
+
+    /**
+     * Retrieves the contents of the Flow with the given Bucket ID, Flow ID, and version, from the Flow Registry
+     *
+     * @param bucketId the ID of the bucket
+     * @param flowId the ID of the flow
+     * @param version the version to retrieve
+     * @return the contents of the Flow from the Flow Registry
+     *
+     * @throws IOException if unable to communicate with the Flow Registry
+     * @throws UnknownResourceException if unable to find the contents of the flow due to the bucket or flow not existing,
+     *             or the specified version of the flow not existing
+     * @throws NullPointerException if any of the arguments is not specified
+     * @throws IllegalArgumentException if the given version is less than 1
+     */
+    VersionedFlowSnapshot getFlowContents(String bucketId, String flowId, int version) throws IOException, UnknownResourceException;
+
+    /**
+     * Retrieves a VersionedFlow by bucket id and flow id
+     *
+     * @param bucketId the ID of the bucket
+     * @param flowId the ID of the flow
+     * @return the VersionedFlow for the given bucket and flow ID's
+     *
+     * @throws IOException if unable to communicate with the Flow Registry
+     * @throws UnknownResourceException if unable to find a flow with the given bucket ID and flow ID
+     */
+    VersionedFlow getVersionedFlow(String bucketId, String flowId) throws IOException, UnknownResourceException;
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java
new file mode 100644 (file)
index 0000000..83f66dc
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow;
+
+import java.util.Set;
+
+public interface FlowRegistryClient {
+    FlowRegistry getFlowRegistry(String registryId);
+
+    default String getFlowRegistryId(String registryUrl) {
+        for (final String registryClientId : getRegistryIdentifiers()) {
+            final FlowRegistry registry = getFlowRegistry(registryClientId);
+            if (registry.getURL().equals(registryUrl)) {
+                return registryClientId;
+            }
+        }
+
+        return null;
+    }
+
+    Set<String> getRegistryIdentifiers();
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/UnknownResourceException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/UnknownResourceException.java
new file mode 100644 (file)
index 0000000..8c95e67
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow;
+
+public class UnknownResourceException extends Exception {
+
+    public UnknownResourceException(String message) {
+        super(message);
+    }
+
+    public UnknownResourceException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public UnknownResourceException(Throwable cause) {
+        super(cause);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/VersionControlInformation.java
new file mode 100644 (file)
index 0000000..ea70b1c
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow;
+
+import java.util.Optional;
+
+/**
+ * <p>
+ * Provides a mechanism for conveying which Flow Registry a flow is stored in, and
+ * where in the Flow Registry the flow is stored.
+ * </p>
+ */
+public interface VersionControlInformation {
+
+    /**
+     * @return the unique identifier of the Flow Registry that this flow is tracking to
+     */
+    String getRegistryIdentifier();
+
+    /**
+     * @return the unique identifier of the bucket that this flow belongs to
+     */
+    String getBucketIdentifier();
+
+    /**
+     * @return the unique identifier of this flow in the Flow Registry
+     */
+    String getFlowIdentifier();
+
+    /**
+     * @return the version of the flow in the Flow Registry that this flow is based on.
+     */
+    int getVersion();
+
+    /**
+     * @return <code>true</code> if the flow has been modified since the last time that it was updated from the Flow Registry or saved
+     *         to the Flow Registry; <code>false</code> if the flow is in sync with the Flow Registry. An empty optional will be returned
+     *         if it is not yet known whether or not the flow has been modified (for example, on startup, when the flow has not yet been
+     *         fetched from the Flow Registry)
+     */
+    Optional<Boolean> getModified();
+
+    /**
+     * @return <code>true</code> if this version of the flow is the most recent version of the flow available in the Flow Registry.
+     *         An empty optional will be returned if it is not yet known whether or not the flow has been modified (for example, on startup,
+     *         when the flow has not yet been fetched from the Flow Registry)
+     */
+    Optional<Boolean> getCurrent();
+
+    /**
+     * @return the snapshot of the flow that was synchronized with the Flow Registry
+     */
+    VersionedProcessGroup getFlowSnapshot();
+}
index d53eb49..09d032e 100644 (file)
             <groupId>org.apache.curator</groupId>
             <artifactId>curator-recipes</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi.registry</groupId>
+            <artifactId>nifi-registry-data-model</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi.registry</groupId>
+            <artifactId>nifi-registry-flow-diff</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.curator</groupId>
index 728e8cf..7aa3003 100644 (file)
@@ -47,6 +47,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -71,6 +72,7 @@ public final class StandardConnection implements Connection {
     private final StandardFlowFileQueue flowFileQueue;
     private final AtomicInteger labelIndex = new AtomicInteger(1);
     private final AtomicLong zIndex = new AtomicLong(0L);
+    private final AtomicReference<String> versionedComponentId = new AtomicReference<>();
     private final ProcessScheduler scheduler;
     private final int hashCode;
 
@@ -519,4 +521,27 @@ public final class StandardConnection implements Connection {
             }
         }
     }
+
+    @Override
+    public Optional<String> getVersionedComponentId() {
+        return Optional.ofNullable(versionedComponentId.get());
+    }
+
+    @Override
+    public void setVersionedComponentId(final String versionedComponentId) {
+        boolean updated = false;
+        while (!updated) {
+            final String currentId = this.versionedComponentId.get();
+
+            if (currentId == null) {
+                updated = this.versionedComponentId.compareAndSet(null, versionedComponentId);
+            } else if (currentId.equals(versionedComponentId)) {
+                return;
+            } else if (versionedComponentId == null) {
+                updated = this.versionedComponentId.compareAndSet(currentId, null);
+            } else {
+                throw new IllegalStateException(this + " is already under version control");
+            }
+        }
+    }
 }
index 56b2590..242ef6a 100644 (file)
@@ -164,6 +164,7 @@ import org.apache.nifi.provenance.ProvenanceRepository;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
 import org.apache.nifi.registry.ComponentVariableRegistry;
 import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
 import org.apache.nifi.registry.variable.MutableVariableRegistry;
 import org.apache.nifi.registry.variable.StandardComponentVariableRegistry;
 import org.apache.nifi.remote.HttpRemoteSiteListener;
@@ -329,6 +330,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     private final List<RemoteGroupPort> startRemoteGroupPortsAfterInitialization;
     private final LeaderElectionManager leaderElectionManager;
     private final ClusterCoordinator clusterCoordinator;
+    private final FlowRegistryClient flowRegistryClient;
 
     /**
      * true if controller is configured to operate in a clustered environment
@@ -395,7 +397,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             final AuditService auditService,
             final StringEncryptor encryptor,
             final BulletinRepository bulletinRepo,
-            final VariableRegistry variableRegistry) {
+            final VariableRegistry variableRegistry,
+            final FlowRegistryClient flowRegistryClient) {
 
         return new FlowController(
                 flowFileEventRepo,
@@ -409,7 +412,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 /* cluster coordinator */ null,
                 /* heartbeat monitor */ null,
                 /* leader election manager */ null,
-                /* variable registry */ variableRegistry);
+                /* variable registry */ variableRegistry,
+                flowRegistryClient);
     }
 
     public static FlowController createClusteredInstance(
@@ -423,7 +427,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             final ClusterCoordinator clusterCoordinator,
             final HeartbeatMonitor heartbeatMonitor,
             final LeaderElectionManager leaderElectionManager,
-            final VariableRegistry variableRegistry) {
+            final VariableRegistry variableRegistry,
+            final FlowRegistryClient flowRegistryClient) {
 
         final FlowController flowController = new FlowController(
                 flowFileEventRepo,
@@ -437,7 +442,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 clusterCoordinator,
                 heartbeatMonitor,
                 leaderElectionManager,
-                variableRegistry);
+                variableRegistry,
+                flowRegistryClient);
 
         return flowController;
     }
@@ -454,7 +460,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             final ClusterCoordinator clusterCoordinator,
             final HeartbeatMonitor heartbeatMonitor,
             final LeaderElectionManager leaderElectionManager,
-            final VariableRegistry variableRegistry) {
+            final VariableRegistry variableRegistry,
+            final FlowRegistryClient flowRegistryClient) {
 
         maxTimerDrivenThreads = new AtomicInteger(10);
         maxEventDrivenThreads = new AtomicInteger(5);
@@ -516,6 +523,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         startRemoteGroupPortsAfterInitialization = new ArrayList<>();
         this.authorizer = authorizer;
         this.auditService = auditService;
+        this.flowRegistryClient = flowRegistryClient;
 
         final String gracefulShutdownSecondsVal = nifiProperties.getProperty(GRACEFUL_SHUTDOWN_PERIOD);
         long shutdownSecs;
@@ -754,6 +762,23 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 }
             }, 0L, 30L, TimeUnit.SECONDS);
 
+            timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
+                @Override
+                public void run() {
+                    final ProcessGroup rootGroup = getRootGroup();
+                    final List<ProcessGroup> allGroups = rootGroup.findAllProcessGroups();
+                    allGroups.add(rootGroup);
+
+                    for (final ProcessGroup group : allGroups) {
+                        try {
+                            group.synchronizeWithFlowRegistry(flowRegistryClient);
+                        } catch (final Exception e) {
+                            LOG.error("Failed to synchronize {} with Flow Registry", group, e);
+                        }
+                    }
+                }
+            }, 5, 60, TimeUnit.SECONDS);
+
             initialized.set(true);
         } finally {
             writeLock.unlock();
@@ -3311,6 +3336,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         return new HashSet<>(reportingTasks.values());
     }
 
+    public FlowRegistryClient getFlowRegistryClient() {
+        return flowRegistryClient;
+    }
+
     @Override
     public ControllerServiceNode createControllerService(final String type, final String id, final BundleCoordinate bundleCoordinate, final Set<URL> additionalUrls, final boolean firstTimeAdded) {
         final ControllerServiceNode serviceNode = controllerServiceProvider.createControllerService(type, id, bundleCoordinate, additionalUrls, firstTimeAdded);
index 3a0b093..e879e38 100644 (file)
@@ -85,6 +85,8 @@ import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.logging.LogLevel;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.SimpleProcessLogger;
+import org.apache.nifi.registry.flow.StandardVersionControlInformation;
+import org.apache.nifi.registry.flow.VersionControlInformation;
 import org.apache.nifi.remote.RemoteGroupPort;
 import org.apache.nifi.remote.RootGroupPort;
 import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
@@ -113,6 +115,7 @@ import org.apache.nifi.web.api.dto.ProcessorDTO;
 import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.ReportingTaskDTO;
 import org.apache.nifi.web.api.dto.TemplateDTO;
+import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.w3c.dom.Document;
@@ -1048,6 +1051,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
         final ProcessGroupDTO processGroupDTO = FlowFromDOMFactory.getProcessGroup(parentId, processGroupElement, encryptor, encodingVersion);
         final ProcessGroup processGroup = controller.createProcessGroup(processGroupDTO.getId());
         processGroup.setComments(processGroupDTO.getComments());
+        processGroup.setVersionedComponentId(processGroupDTO.getVersionedComponentId());
         processGroup.setPosition(toPosition(processGroupDTO.getPosition()));
         processGroup.setName(processGroupDTO.getName());
         processGroup.setParent(parentGroup);
@@ -1072,6 +1076,20 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
 
         processGroup.setVariables(variables);
 
+        final VersionControlInformationDTO versionControlInfoDto = processGroupDTO.getVersionControlInformation();
+        if (versionControlInfoDto != null) {
+            final String registryId = versionControlInfoDto.getRegistryId();
+            final String bucketId = versionControlInfoDto.getBucketId();
+            final String flowId = versionControlInfoDto.getFlowId();
+            final int version = versionControlInfoDto.getVersion();
+            final boolean modified = false;
+            final boolean current = true;
+
+            final VersionControlInformation versionControlInformation = new StandardVersionControlInformation(registryId, bucketId, flowId, version, null, modified, current);
+            // pass empty map for the version control mapping because the VersionedComponentId has already been set on the components
+            processGroup.setVersionControlInformation(versionControlInformation, Collections.emptyMap());
+        }
+
         // Add Controller Services
         final List<Element> serviceNodeList = getChildrenByTagName(processGroupElement, "controllerService");
         if (!serviceNodeList.isEmpty()) {
@@ -1097,6 +1115,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
             }
 
             final ProcessorNode procNode = controller.createProcessor(processorDTO.getType(), processorDTO.getId(), coordinate, false);
+            procNode.setVersionedComponentId(processorDTO.getVersionedComponentId());
             processGroup.addProcessor(procNode);
             updateProcessor(procNode, processorDTO, processGroup, controller);
         }
@@ -1113,6 +1132,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
                 port = controller.createLocalInputPort(portDTO.getId(), portDTO.getName());
             }
 
+            port.setVersionedComponentId(portDTO.getVersionedComponentId());
             port.setPosition(toPosition(portDTO.getPosition()));
             port.setComments(portDTO.getComments());
             port.setProcessGroup(processGroup);
@@ -1156,6 +1176,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
             } else {
                 port = controller.createLocalOutputPort(portDTO.getId(), portDTO.getName());
             }
+
+            port.setVersionedComponentId(portDTO.getVersionedComponentId());
             port.setPosition(toPosition(portDTO.getPosition()));
             port.setComments(portDTO.getComments());
             port.setProcessGroup(processGroup);
@@ -1193,6 +1215,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
         for (final Element funnelElement : funnelNodeList) {
             final FunnelDTO funnelDTO = FlowFromDOMFactory.getFunnel(funnelElement);
             final Funnel funnel = controller.createFunnel(funnelDTO.getId());
+            funnel.setVersionedComponentId(funnelDTO.getVersionedComponentId());
             funnel.setPosition(toPosition(funnelDTO.getPosition()));
 
             // Since this is called during startup, we want to add the funnel without enabling it
@@ -1207,6 +1230,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
         for (final Element labelElement : labelNodeList) {
             final LabelDTO labelDTO = FlowFromDOMFactory.getLabel(labelElement);
             final Label label = controller.createLabel(labelDTO.getId(), labelDTO.getLabel());
+            label.setVersionedComponentId(labelDTO.getVersionedComponentId());
             label.setStyle(labelDTO.getStyle());
 
             label.setPosition(toPosition(labelDTO.getPosition()));
@@ -1225,6 +1249,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
         for (final Element remoteProcessGroupElement : remoteProcessGroupNodeList) {
             final RemoteProcessGroupDTO remoteGroupDto = FlowFromDOMFactory.getRemoteProcessGroup(remoteProcessGroupElement, encryptor);
             final RemoteProcessGroup remoteGroup = controller.createRemoteProcessGroup(remoteGroupDto.getId(), remoteGroupDto.getTargetUris());
+            remoteGroup.setVersionedComponentId(remoteGroupDto.getVersionedComponentId());
             remoteGroup.setComments(remoteGroupDto.getComments());
             remoteGroup.setPosition(toPosition(remoteGroupDto.getPosition()));
             final String name = remoteGroupDto.getName();
@@ -1332,6 +1357,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
             }
 
             final Connection connection = controller.createConnection(dto.getId(), dto.getName(), source, destination, dto.getSelectedRelationships());
+            connection.setVersionedComponentId(dto.getVersionedComponentId());
             connection.setProcessGroup(processGroup);
 
             final List<Position> bendPoints = new ArrayList<>();
index 88912aa..187b62f 100644 (file)
@@ -28,6 +28,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
@@ -126,6 +127,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
     private final AtomicInteger concurrentTaskCount;
     private final AtomicLong yieldExpiration;
     private final AtomicLong schedulingNanos;
+    private final AtomicReference<String> versionedComponentId = new AtomicReference<>();
     private final ProcessScheduler processScheduler;
     private long runNanos = 0L;
     private volatile long yieldNanos;
@@ -1511,4 +1513,26 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
         return group == null ? null : group.getIdentifier();
     }
 
+    @Override
+    public Optional<String> getVersionedComponentId() {
+        return Optional.ofNullable(versionedComponentId.get());
+    }
+
+    @Override
+    public void setVersionedComponentId(final String versionedComponentId) {
+        boolean updated = false;
+        while (!updated) {
+            final String currentId = this.versionedComponentId.get();
+
+            if (currentId == null) {
+                updated = this.versionedComponentId.compareAndSet(null, versionedComponentId);
+            } else if (currentId.equals(versionedComponentId)) {
+                return;
+            } else if (versionedComponentId == null) {
+                updated = this.versionedComponentId.compareAndSet(currentId, null);
+            } else {
+                throw new IllegalStateException(this + " is already under version control");
+            }
+        }
+    }
 }
index 32e742d..2e98e84 100644 (file)
@@ -28,6 +28,7 @@ import org.apache.nifi.util.CharacterFilterUtils;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class StandardLabel implements Label {
@@ -38,6 +39,7 @@ public class StandardLabel implements Label {
     private final AtomicReference<Map<String, String>> style;
     private final AtomicReference<String> value;
     private final AtomicReference<ProcessGroup> processGroup;
+    private final AtomicReference<String> versionedComponentId = new AtomicReference<>();
 
     public StandardLabel(final String identifier, final String value) {
         this(identifier, new Position(0D, 0D), new HashMap<String, String>(), value, null);
@@ -76,6 +78,7 @@ public class StandardLabel implements Label {
         }
     }
 
+    @Override
     public String getIdentifier() {
         return identifier;
     }
@@ -96,10 +99,12 @@ public class StandardLabel implements Label {
         return ResourceFactory.getComponentResource(ResourceType.Label, getIdentifier(),"Label");
     }
 
+    @Override
     public Map<String, String> getStyle() {
         return style.get();
     }
 
+    @Override
     public void setStyle(final Map<String, String> style) {
         if (style != null) {
             boolean updated = false;
@@ -112,19 +117,46 @@ public class StandardLabel implements Label {
         }
     }
 
+    @Override
     public String getValue() {
         return value.get();
     }
 
+    @Override
     public void setValue(final String value) {
         this.value.set(CharacterFilterUtils.filterInvalidXmlCharacters(value));
     }
 
+    @Override
     public void setProcessGroup(final ProcessGroup group) {
         this.processGroup.set(group);
     }
 
+    @Override
     public ProcessGroup getProcessGroup() {
         return processGroup.get();
     }
+
+    @Override
+    public Optional<String> getVersionedComponentId() {
+        return Optional.ofNullable(versionedComponentId.get());
+    }
+
+    @Override
+    public void setVersionedComponentId(final String versionedComponentId) {
+        boolean updated = false;
+        while (!updated) {
+            final String currentId = this.versionedComponentId.get();
+
+            if (currentId == null) {
+                updated = this.versionedComponentId.compareAndSet(null, versionedComponentId);
+            } else if (currentId.equals(versionedComponentId)) {
+                return;
+            } else if (versionedComponentId == null) {
+                updated = this.versionedComponentId.compareAndSet(currentId, null);
+            } else {
+                throw new IllegalStateException(this + " is already under version control");
+            }
+        }
+    }
 }
index f67ecd9..a2a589a 100644 (file)
@@ -50,6 +50,7 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
 import org.apache.nifi.web.api.dto.ReportingTaskDTO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
 import org.w3c.dom.Element;
 import org.w3c.dom.NodeList;
 
@@ -102,6 +103,7 @@ public class FlowFromDOMFactory {
         final ControllerServiceDTO dto = new ControllerServiceDTO();
 
         dto.setId(getString(element, "id"));
+        dto.setVersionedComponentId(getString(element, "versionedComponentId"));
         dto.setName(getString(element, "name"));
         dto.setComments(getString(element, "comment"));
         dto.setType(getString(element, "class"));
@@ -138,6 +140,7 @@ public class FlowFromDOMFactory {
         final ProcessGroupDTO dto = new ProcessGroupDTO();
         final String groupId = getString(element, "id");
         dto.setId(groupId);
+        dto.setVersionedComponentId(getString(element, "versionedComponentId"));
         dto.setParentGroupId(parentId);
         dto.setName(getString(element, "name"));
         dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
@@ -153,6 +156,9 @@ public class FlowFromDOMFactory {
         }
         dto.setVariables(variables);
 
+        final Element versionControlInfoElement = DomUtils.getChild(element, "versionControlInformation");
+        dto.setVersionControlInformation(getVersionControlInformation(versionControlInfoElement));
+
         final Set<ProcessorDTO> processors = new HashSet<>();
         final Set<ConnectionDTO> connections = new HashSet<>();
         final Set<FunnelDTO> funnels = new HashSet<>();
@@ -216,12 +222,26 @@ public class FlowFromDOMFactory {
         return dto;
     }
 
+    private static VersionControlInformationDTO getVersionControlInformation(final Element versionControlInfoElement) {
+        if (versionControlInfoElement == null) {
+            return null;
+        }
+
+        final VersionControlInformationDTO dto = new VersionControlInformationDTO();
+        dto.setRegistryId(getString(versionControlInfoElement, "registryId"));
+        dto.setBucketId(getString(versionControlInfoElement, "bucketId"));
+        dto.setFlowId(getString(versionControlInfoElement, "flowId"));
+        dto.setVersion(getInt(versionControlInfoElement, "version"));
+        return dto;
+    }
+
     public static ConnectionDTO getConnection(final Element element) {
         final ConnectionDTO dto = new ConnectionDTO();
         dto.setId(getString(element, "id"));
         dto.setName(getString(element, "name"));
         dto.setLabelIndex(getOptionalInt(element, "labelIndex"));
         dto.setzIndex(getOptionalLong(element, "zIndex"));
+        dto.setVersionedComponentId(getString(element, "versionedComponentId"));
 
         final List<PositionDTO> bends = new ArrayList<>();
         final Element bendPointsElement = DomUtils.getChild(element, "bendPoints");
@@ -278,6 +298,7 @@ public class FlowFromDOMFactory {
     public static RemoteProcessGroupDTO getRemoteProcessGroup(final Element element, final StringEncryptor encryptor) {
         final RemoteProcessGroupDTO dto = new RemoteProcessGroupDTO();
         dto.setId(getString(element, "id"));
+        dto.setVersionedComponentId(getString(element, "versionedComponentId"));
         dto.setName(getString(element, "name"));
         dto.setTargetUri(getString(element, "url"));
         dto.setTargetUris(getString(element, "urls"));
@@ -302,6 +323,7 @@ public class FlowFromDOMFactory {
     public static LabelDTO getLabel(final Element element) {
         final LabelDTO dto = new LabelDTO();
         dto.setId(getString(element, "id"));
+        dto.setVersionedComponentId(getString(element, "versionedComponentId"));
         dto.setLabel(getString(element, "value"));
         dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
         final Size size = getSize(DomUtils.getChild(element, "size"));
@@ -315,6 +337,7 @@ public class FlowFromDOMFactory {
     public static FunnelDTO getFunnel(final Element element) {
         final FunnelDTO dto = new FunnelDTO();
         dto.setId(getString(element, "id"));
+        dto.setVersionedComponentId(getString(element, "versionedComponentId"));
         dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
 
         return dto;
@@ -323,6 +346,7 @@ public class FlowFromDOMFactory {
     public static PortDTO getPort(final Element element) {
         final PortDTO portDTO = new PortDTO();
         portDTO.setId(getString(element, "id"));
+        portDTO.setVersionedComponentId(getString(element, "versionedComponentId"));
         portDTO.setPosition(getPosition(DomUtils.getChild(element, "position")));
         portDTO.setName(getString(element, "name"));
         portDTO.setComments(getString(element, "comments"));
@@ -370,6 +394,7 @@ public class FlowFromDOMFactory {
         final String targetId = getString(element, "targetId");
         descriptor.setTargetId(targetId == null ? id : targetId);
 
+        descriptor.setVersionedComponentId(getString(element, "versionedComponentId"));
         descriptor.setName(getString(element, "name"));
         descriptor.setComments(getString(element, "comments"));
         descriptor.setConcurrentlySchedulableTaskCount(getInt(element, "maxConcurrentTasks"));
@@ -386,6 +411,7 @@ public class FlowFromDOMFactory {
         final ProcessorDTO dto = new ProcessorDTO();
 
         dto.setId(getString(element, "id"));
+        dto.setVersionedComponentId(getString(element, "versionedComponentId"));
         dto.setName(getString(element, "name"));
         dto.setType(getString(element, "class"));
         dto.setBundle(getBundle(DomUtils.getChild(element, "bundle")));
index bc28a25..ecf2438 100644 (file)
@@ -39,6 +39,7 @@ import org.apache.nifi.persistence.TemplateSerializer;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.registry.VariableDescriptor;
 import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.flow.VersionControlInformation;
 import org.apache.nifi.remote.RemoteGroupPort;
 import org.apache.nifi.remote.RootGroupPort;
 import org.apache.nifi.util.CharacterFilterUtils;
@@ -63,6 +64,7 @@ import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -151,10 +153,21 @@ public class StandardFlowSerializer implements FlowSerializer {
         final Element element = doc.createElement(elementName);
         parentElement.appendChild(element);
         addTextElement(element, "id", group.getIdentifier());
+        addTextElement(element, "versionedComponentId", group.getVersionedComponentId());
         addTextElement(element, "name", group.getName());
         addPosition(element, group.getPosition());
         addTextElement(element, "comment", group.getComments());
 
+        final VersionControlInformation versionControlInfo = group.getVersionControlInformation();
+        if (versionControlInfo != null) {
+            final Element versionControlInfoElement = doc.createElement("versionControlInformation");
+            addTextElement(versionControlInfoElement, "registryId", versionControlInfo.getRegistryIdentifier());
+            addTextElement(versionControlInfoElement, "bucketId", versionControlInfo.getBucketIdentifier());
+            addTextElement(versionControlInfoElement, "flowId", versionControlInfo.getFlowIdentifier());
+            addTextElement(versionControlInfoElement, "version", versionControlInfo.getVersion());
+            element.appendChild(versionControlInfoElement);
+        }
+
         for (final ProcessorNode processor : group.getProcessors()) {
             addProcessor(element, processor, scheduledStateLookup);
         }
@@ -258,6 +271,7 @@ public class StandardFlowSerializer implements FlowSerializer {
         final Element element = doc.createElement("label");
         parentElement.appendChild(element);
         addTextElement(element, "id", label.getIdentifier());
+        addTextElement(element, "versionedComponentId", label.getVersionedComponentId());
 
         addPosition(element, label.getPosition());
         addSize(element, label.getSize());
@@ -272,6 +286,7 @@ public class StandardFlowSerializer implements FlowSerializer {
         final Element element = doc.createElement("funnel");
         parentElement.appendChild(element);
         addTextElement(element, "id", funnel.getIdentifier());
+        addTextElement(element, "versionedComponentId", funnel.getVersionedComponentId());
         addPosition(element, funnel.getPosition());
     }
 
@@ -280,6 +295,7 @@ public class StandardFlowSerializer implements FlowSerializer {
         final Element element = doc.createElement("remoteProcessGroup");
         parentElement.appendChild(element);
         addTextElement(element, "id", remoteRef.getIdentifier());
+        addTextElement(element, "versionedComponentId", remoteRef.getVersionedComponentId());
         addTextElement(element, "name", remoteRef.getName());
         addPosition(element, remoteRef.getPosition());
         addTextElement(element, "comment", remoteRef.getComments());
@@ -322,6 +338,7 @@ public class StandardFlowSerializer implements FlowSerializer {
         final Element element = doc.createElement(elementName);
         parentElement.appendChild(element);
         addTextElement(element, "id", port.getIdentifier());
+        addTextElement(element, "versionedComponentId", port.getVersionedComponentId());
         addTextElement(element, "name", port.getName());
         addPosition(element, port.getPosition());
         addTextElement(element, "comments", port.getComments());
@@ -350,6 +367,7 @@ public class StandardFlowSerializer implements FlowSerializer {
         final Element element = doc.createElement(elementName);
         parentElement.appendChild(element);
         addTextElement(element, "id", port.getIdentifier());
+        addTextElement(element, "versionedComponentId", port.getVersionedComponentId());
         addTextElement(element, "name", port.getName());
         addPosition(element, port.getPosition());
         addTextElement(element, "comments", port.getComments());
@@ -363,6 +381,7 @@ public class StandardFlowSerializer implements FlowSerializer {
         final Element element = doc.createElement(elementName);
         parentElement.appendChild(element);
         addTextElement(element, "id", port.getIdentifier());
+        addTextElement(element, "versionedComponentId", port.getVersionedComponentId());
         addTextElement(element, "name", port.getName());
         addPosition(element, port.getPosition());
         addTextElement(element, "comments", port.getComments());
@@ -383,6 +402,7 @@ public class StandardFlowSerializer implements FlowSerializer {
         final Element element = doc.createElement("processor");
         parentElement.appendChild(element);
         addTextElement(element, "id", processor.getIdentifier());
+        addTextElement(element, "versionedComponentId", processor.getVersionedComponentId());
         addTextElement(element, "name", processor.getName());
 
         addPosition(element, processor.getPosition());
@@ -444,6 +464,7 @@ public class StandardFlowSerializer implements FlowSerializer {
         final Element element = doc.createElement("connection");
         parentElement.appendChild(element);
         addTextElement(element, "id", connection.getIdentifier());
+        addTextElement(element, "versionedComponentId", connection.getVersionedComponentId());
         addTextElement(element, "name", connection.getName());
 
         final Element bendPointsElement = doc.createElement("bendPoints");
@@ -500,6 +521,7 @@ public class StandardFlowSerializer implements FlowSerializer {
     public void addControllerService(final Element element, final ControllerServiceNode serviceNode) {
         final Element serviceElement = element.getOwnerDocument().createElement("controllerService");
         addTextElement(serviceElement, "id", serviceNode.getIdentifier());
+        addTextElement(serviceElement, "versionedComponentId", serviceNode.getVersionedComponentId());
         addTextElement(serviceElement, "name", serviceNode.getName());
         addTextElement(serviceElement, "comment", serviceNode.getComments());
         addTextElement(serviceElement, "class", serviceNode.getCanonicalClassName());
@@ -544,6 +566,17 @@ public class StandardFlowSerializer implements FlowSerializer {
         element.appendChild(toAdd);
     }
 
+    private static void addTextElement(final Element element, final String name, final Optional<String> value) {
+        if (!value.isPresent()) {
+            return;
+        }
+
+        final Document doc = element.getOwnerDocument();
+        final Element toAdd = doc.createElement(name);
+        toAdd.setTextContent(CharacterFilterUtils.filterInvalidXmlCharacters(value.get())); // value should already be filtered, but just in case ensure there are no invalid xml characters
+        element.appendChild(toAdd);
+    }
+
     public static void addTemplate(final Element element, final Template template) {
         try {
             final byte[] serialized = TemplateSerializer.serialize(template.getDetails());
index 3faffd7..633f0ed 100644 (file)
@@ -198,6 +198,7 @@ public class ControllerServiceLoader {
         final ControllerServiceNode node = provider.createControllerService(dto.getType(), dto.getId(), coordinate, Collections.emptySet(), false);
         node.setName(dto.getName());
         node.setComments(dto.getComments());
+        node.setVersionedComponentId(dto.getVersionedComponentId());
         return node;
     }
 
index f46f796..53fd166 100644 (file)
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
@@ -72,6 +73,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
     private final AtomicReference<ControllerServiceDetails> controllerServiceHolder = new AtomicReference<>(null);
     private final ControllerServiceProvider serviceProvider;
     private final ServiceStateTransition stateTransition = new ServiceStateTransition();
+    private final AtomicReference<String> versionedComponentId = new AtomicReference<>();
 
     private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
     private final Lock readLock = rwLock.readLock();
@@ -526,4 +528,26 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
         return results != null ? results : Collections.emptySet();
     }
 
+    @Override
+    public Optional<String> getVersionedComponentId() {
+        return Optional.ofNullable(versionedComponentId.get());
+    }
+
+    @Override
+    public void setVersionedComponentId(final String versionedComponentId) {
+        boolean updated = false;
+        while (!updated) {
+            final String currentId = this.versionedComponentId.get();
+
+            if (currentId == null) {
+                updated = this.versionedComponentId.compareAndSet(null, versionedComponentId);
+            } else if (currentId.equals(versionedComponentId)) {
+                return;
+            } else if (versionedComponentId == null) {
+                updated = this.versionedComponentId.compareAndSet(currentId, null);
+            } else {
+                throw new IllegalStateException(this + " is already under version control");
+            }
+        }
+    }
 }
index 7c68475..3aa5084 100644 (file)
@@ -268,6 +268,17 @@ public class FingerprintFactory {
     private StringBuilder addProcessGroupFingerprint(final StringBuilder builder, final Element processGroupElem, final FlowController controller) throws FingerprintException {
         // id
         appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "id"));
+        appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "versionedComponentId"));
+
+        final Element versionControlInfo = DomUtils.getChild(processGroupElem, "versionControlInformation");
+        if (versionControlInfo == null) {
+            builder.append("NO_VERSION_CONTROL_INFORMATION");
+        } else {
+            appendFirstValue(builder, DomUtils.getChildNodesByTagName(versionControlInfo, "registryId"));
+            appendFirstValue(builder, DomUtils.getChildNodesByTagName(versionControlInfo, "bucketId"));
+            appendFirstValue(builder, DomUtils.getChildNodesByTagName(versionControlInfo, "flowId"));
+            appendFirstValue(builder, DomUtils.getChildNodesByTagName(versionControlInfo, "version"));
+        }
 
         // processors
         final List<Element> processorElems = DomUtils.getChildElementsByTagName(processGroupElem, "processor");
@@ -344,6 +355,7 @@ public class FingerprintFactory {
     private StringBuilder addFlowFileProcessorFingerprint(final StringBuilder builder, final Element processorElem) throws FingerprintException {
         // id
         appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "id"));
+        appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "versionedComponentId"));
         // class
         final NodeList childNodes = DomUtils.getChildNodesByTagName(processorElem, "class");
         final String className = childNodes.item(0).getTextContent();
@@ -435,6 +447,7 @@ public class FingerprintFactory {
     private StringBuilder addPortFingerprint(final StringBuilder builder, final Element portElem) throws FingerprintException {
         // id
         appendFirstValue(builder, DomUtils.getChildNodesByTagName(portElem, "id"));
+        appendFirstValue(builder, DomUtils.getChildNodesByTagName(portElem, "versionedComponentId"));
         appendFirstValue(builder, DomUtils.getChildNodesByTagName(portElem, "name"));
 
         final NodeList userAccessControlNodeList = DomUtils.getChildNodesByTagName(portElem, "userAccessControl");
@@ -471,13 +484,14 @@ public class FingerprintFactory {
 
     private StringBuilder addLabelFingerprint(final StringBuilder builder, final Element labelElem) {
         appendFirstValue(builder, DomUtils.getChildNodesByTagName(labelElem, "id"));
+        appendFirstValue(builder, DomUtils.getChildNodesByTagName(labelElem, "versionedComponentId"));
         appendFirstValue(builder, DomUtils.getChildNodesByTagName(labelElem, "value"));
         return builder;
     }
 
     private StringBuilder addRemoteProcessGroupFingerprint(final StringBuilder builder, final Element remoteProcessGroupElem) throws FingerprintException {
 
-        for (String tagName : new String[]{"id", "urls", "networkInterface", "timeout", "yieldPeriod",
+        for (String tagName : new String[] {"id", "versionedComponentId", "urls", "networkInterface", "timeout", "yieldPeriod",
                 "transportProtocol", "proxyHost", "proxyPort", "proxyUser", "proxyPassword"}) {
             final String value = getFirstValue(DomUtils.getChildNodesByTagName(remoteProcessGroupElem, tagName));
             if (isEncrypted(value)) {
@@ -544,7 +558,7 @@ public class FingerprintFactory {
     }
 
     private StringBuilder addRemoteGroupPortFingerprint(final StringBuilder builder, final Element remoteGroupPortElement) {
-        for (final String childName : new String[] {"id", "targetId", "maxConcurrentTasks", "useCompression", "batchCount", "batchSize", "batchDuration"}) {
+        for (final String childName : new String[] {"id", "targetId", "versionedComponentId", "maxConcurrentTasks", "useCompression", "batchCount", "batchSize", "batchDuration"}) {
             appendFirstValue(builder, DomUtils.getChildNodesByTagName(remoteGroupPortElement, childName));
         }
 
@@ -555,6 +569,7 @@ public class FingerprintFactory {
     private StringBuilder addConnectionFingerprint(final StringBuilder builder, final Element connectionElem) throws FingerprintException {
         // id
         appendFirstValue(builder, DomUtils.getChildNodesByTagName(connectionElem, "id"));
+        appendFirstValue(builder, DomUtils.getChildNodesByTagName(connectionElem, "versionedComponentId"));
         // source id
         appendFirstValue(builder, DomUtils.getChildNodesByTagName(connectionElem, "sourceId"));
         // source group id
@@ -583,11 +598,13 @@ public class FingerprintFactory {
     private StringBuilder addFunnelFingerprint(final StringBuilder builder, final Element funnelElem) throws FingerprintException {
         // id
         appendFirstValue(builder, DomUtils.getChildNodesByTagName(funnelElem, "id"));
+        appendFirstValue(builder, DomUtils.getChildNodesByTagName(funnelElem, "versionedComponentId"));
         return builder;
     }
 
     private void addControllerServiceFingerprint(final StringBuilder builder, final ControllerServiceDTO dto) {
         builder.append(dto.getId());
+        builder.append(dto.getVersionedComponentId());
         builder.append(dto.getType());
         builder.append(dto.getName());
 
index ec32cc1..282c50d 100644 (file)
  */
 package org.apache.nifi.groups;
 
-import com.google.common.collect.Sets;
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.commons.lang3.builder.ToStringBuilder;
@@ -29,6 +52,7 @@ import org.apache.nifi.authorization.Resource;
 import org.apache.nifi.authorization.resource.Authorizable;
 import org.apache.nifi.authorization.resource.ResourceFactory;
 import org.apache.nifi.authorization.resource.ResourceType;
+import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateManagerProvider;
@@ -40,6 +64,7 @@ import org.apache.nifi.connectable.LocalPort;
 import org.apache.nifi.connectable.Port;
 import org.apache.nifi.connectable.Position;
 import org.apache.nifi.connectable.Positionable;
+import org.apache.nifi.connectable.Size;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ConfiguredComponent;
 import org.apache.nifi.controller.ControllerService;
@@ -49,22 +74,58 @@ import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.Snippet;
 import org.apache.nifi.controller.Template;
 import org.apache.nifi.controller.exception.ComponentLifeCycleException;
+import org.apache.nifi.controller.exception.ProcessorInstantiationException;
 import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.controller.service.ControllerServiceReference;
 import org.apache.nifi.controller.service.StandardConfigurationContext;
 import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.logging.LogLevel;
 import org.apache.nifi.logging.LogRepositoryFactory;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.StandardProcessContext;
 import org.apache.nifi.registry.ComponentVariableRegistry;
 import org.apache.nifi.registry.VariableDescriptor;
+import org.apache.nifi.registry.flow.Bundle;
+import org.apache.nifi.registry.flow.ConnectableComponent;
+import org.apache.nifi.registry.flow.FlowRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.RemoteFlowCoordinates;
+import org.apache.nifi.registry.flow.StandardVersionControlInformation;
+import org.apache.nifi.registry.flow.UnknownResourceException;
+import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.VersionedConnection;
+import org.apache.nifi.registry.flow.VersionedControllerService;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFunnel;
+import org.apache.nifi.registry.flow.VersionedLabel;
+import org.apache.nifi.registry.flow.VersionedPort;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.flow.VersionedProcessor;
+import org.apache.nifi.registry.flow.VersionedRemoteGroupPort;
+import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup;
+import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
+import org.apache.nifi.registry.flow.diff.DifferenceType;
+import org.apache.nifi.registry.flow.diff.FlowComparator;
+import org.apache.nifi.registry.flow.diff.FlowComparison;
+import org.apache.nifi.registry.flow.diff.FlowDifference;
+import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow;
+import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
+import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
 import org.apache.nifi.registry.variable.MutableVariableRegistry;
 import org.apache.nifi.remote.RemoteGroupPort;
 import org.apache.nifi.remote.RootGroupPort;
+import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
+import org.apache.nifi.scheduling.ExecutionNode;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.util.ComponentIdGenerator;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.ReflectionUtils;
 import org.apache.nifi.web.Revision;
@@ -72,23 +133,6 @@ import org.apache.nifi.web.api.dto.TemplateDTO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-
-import static java.util.Objects.requireNonNull;
-
 public final class StandardProcessGroup implements ProcessGroup {
 
     private final String id;
@@ -96,6 +140,8 @@ public final class StandardProcessGroup implements ProcessGroup {
     private final AtomicReference<String> name;
     private final AtomicReference<Position> position;
     private final AtomicReference<String> comments;
+    private final AtomicReference<String> versionedComponentId = new AtomicReference<>();
+    private final AtomicReference<StandardVersionControlInformation> versionControlInfo = new AtomicReference<>();
 
     private final StandardProcessScheduler scheduler;
     private final ControllerServiceProvider controllerServiceProvider;
@@ -782,7 +828,6 @@ public final class StandardProcessGroup implements ProcessGroup {
 
             removed = true;
             LOG.info("{} removed from flow", processor);
-
         } finally {
             if (removed) {
                 try {
@@ -1935,7 +1980,6 @@ public final class StandardProcessGroup implements ProcessGroup {
 
             removed = true;
             LOG.info("{} removed from {}", service, this);
-
         } finally {
             if (removed) {
                 try {
@@ -2234,7 +2278,7 @@ public final class StandardProcessGroup implements ProcessGroup {
 
     @Override
     public Set<Positionable> findAllPositionables() {
-        Set<Positionable> positionables = Sets.newHashSet();
+        final Set<Positionable> positionables = new HashSet<>();
         positionables.addAll(findAllConnectables(this, true));
         List<ProcessGroup> allProcessGroups = findAllProcessGroups();
         positionables.addAll(allProcessGroups);
@@ -2371,7 +2415,7 @@ public final class StandardProcessGroup implements ProcessGroup {
                 connection.verifyCanDelete();
             }
 
-            for(final ControllerServiceNode cs : controllerServices.values()) {
+            for (final ControllerServiceNode cs : controllerServices.values()) {
                 cs.verifyCanDelete();
             }
 
@@ -2647,6 +2691,31 @@ public final class StandardProcessGroup implements ProcessGroup {
     }
 
     @Override
+    public Optional<String> getVersionedComponentId() {
+        return Optional.ofNullable(versionedComponentId.get());
+    }
+
+    @Override
+    public void setVersionedComponentId(final String componentId) {
+        writeLock.lock();
+        try {
+            final String currentId = versionedComponentId.get();
+
+            if (currentId == null) {
+                versionedComponentId.set(componentId);
+            } else if (currentId.equals(componentId)) {
+                return;
+            } else if (componentId == null) {
+                versionedComponentId.set(null);
+            } else {
+                throw new IllegalStateException(this + " is already under version control with a different Versioned Component ID");
+            }
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    @Override
     public Set<ConfiguredComponent> getComponentsAffectedByVariable(final String variableName) {
         final Set<ConfiguredComponent> affected = new HashSet<>();
 
@@ -2736,4 +2805,1072 @@ public final class StandardProcessGroup implements ProcessGroup {
         }
     }
 
+    @Override
+    public VersionControlInformation getVersionControlInformation() {
+        return versionControlInfo.get();
+    }
+
+    @Override
+    public void setVersionControlInformation(final VersionControlInformation versionControlInformation, final Map<String, String> versionedComponentIds) {
+        final StandardVersionControlInformation svci = new StandardVersionControlInformation(versionControlInformation.getRegistryIdentifier(),
+            versionControlInformation.getBucketIdentifier(),
+            versionControlInformation.getFlowIdentifier(),
+            versionControlInformation.getVersion(),
+            versionControlInformation.getFlowSnapshot(),
+            versionControlInformation.getModified().orElse(null),
+            versionControlInformation.getCurrent().orElse(null)) {
+
+            @Override
+            public Optional<Boolean> getModified() {
+                return StandardProcessGroup.this.isModified();
+            }
+        };
+
+        writeLock.lock();
+        try {
+            updateVersionedComponentIds(this, versionedComponentIds);
+            this.versionControlInfo.set(svci);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    private void updateVersionedComponentIds(final ProcessGroup processGroup, final Map<String, String> versionedComponentIds) {
+        if (versionedComponentIds == null || versionedComponentIds.isEmpty()) {
+            return;
+        }
+
+        processGroup.setVersionedComponentId(versionedComponentIds.get(processGroup.getIdentifier()));
+
+        processGroup.getConnections().stream()
+            .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+        processGroup.getProcessors().stream()
+            .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+        processGroup.getInputPorts().stream()
+            .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+        processGroup.getOutputPorts().stream()
+            .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+        processGroup.getLabels().stream()
+            .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+        processGroup.getFunnels().stream()
+            .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+        processGroup.getControllerServices(false).stream()
+            .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+
+        processGroup.getRemoteProcessGroups().stream()
+            .forEach(rpg -> {
+                rpg.setVersionedComponentId(versionedComponentIds.get(rpg.getIdentifier()));
+
+                rpg.getInputPorts().stream()
+                    .forEach(port -> port.setVersionedComponentId(versionedComponentIds.get(port.getIdentifier())));
+
+                rpg.getOutputPorts().stream()
+                    .forEach(port -> port.setVersionedComponentId(versionedComponentIds.get(port.getIdentifier())));
+            });
+
+        processGroup.getProcessGroups().stream()
+            .forEach(childGroup -> updateVersionedComponentIds(childGroup, versionedComponentIds));
+    }
+
+
+    @Override
+    public void synchronizeWithFlowRegistry(final FlowRegistryClient flowRegistryClient) {
+        final StandardVersionControlInformation vci = versionControlInfo.get();
+        if (vci == null) {
+            return;
+        }
+
+        final String registryId = vci.getRegistryIdentifier();
+        final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId);
+        if (flowRegistry == null) {
+            LOG.error("Unable to synchronize {} with Flow Registry because Process Group was placed under Version Control using Flow Registry "
+                + "with identifier {} but cannot find any Flow Registry with this identifier", this, registryId);
+            return;
+        }
+
+        try {
+            final int latestVersion = flowRegistry.getLatestVersion(vci.getBucketIdentifier(), vci.getFlowIdentifier());
+
+            if (latestVersion == vci.getVersion()) {
+                LOG.debug("{} is currently at the most recent version ({}) of the flow that is under Version Control", this, latestVersion);
+                vci.setCurrent(true);
+            } else {
+                vci.setCurrent(false);
+                LOG.info("{} is not the most recent version of the flow that is under Version Control; current version is {}; most recent version is {}",
+                    new Object[] {this, vci.getVersion(), latestVersion});
+            }
+        } catch (final IOException | UnknownResourceException e) {
+            LOG.error("Failed to synchronize {} with Flow Registry because could not determine the most recent version of the Flow in the Flow Registry", this, e);
+        }
+
+        final VersionedProcessGroup snapshot = vci.getFlowSnapshot();
+        if (snapshot == null) {
+            // We have not yet obtained the snapshot from the Flow Registry, so we need to request the snapshot of our local version of the flow from the Flow Registry.
+            // This allows us to know whether or not the flow has been modified since it was last synced with the Flow Registry.
+            try {
+                final VersionedFlowSnapshot registrySnapshot = flowRegistry.getFlowContents(vci.getBucketIdentifier(), vci.getFlowIdentifier(), vci.getVersion());
+                final VersionedProcessGroup registryFlow = registrySnapshot.getFlowContents();
+                vci.setFlowSnapshot(registryFlow);
+            } catch (final IOException | UnknownResourceException e) {
+                LOG.error("Failed to synchronize {} with Flow Registry because could not retrieve version {} of flow with identifier {} in bucket {}",
+                    new Object[] {this, vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier()}, e);
+                return;
+            }
+        }
+    }
+
+
+    @Override
+    public void updateFlow(final VersionedFlowSnapshot proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty) {
+        writeLock.lock();
+        try {
+            verifyCanUpdate(proposedSnapshot, true, verifyNotDirty); // TODO: Should perform more verification... verifyCanDelete, verifyCanUpdate, etc. Recursively if child is under VC also
+
+            final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
+            final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient());
+
+            final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", versionedGroup);
+            final ComparableDataFlow remoteFlow = new StandardComparableDataFlow("Remote Flow", proposedSnapshot.getFlowContents());
+
+            final FlowComparator flowComparator = new StandardFlowComparator(localFlow, remoteFlow);
+            final FlowComparison flowComparison = flowComparator.compare();
+
+            final Set<String> updatedVersionedComponentIds = flowComparison.getDifferences().stream()
+                .filter(diff -> diff.getDifferenceType() != DifferenceType.POSITION_CHANGED)
+                .map(diff -> diff.getComponentA() == null ? diff.getComponentB().getIdentifier() : diff.getComponentA().getIdentifier())
+                .collect(Collectors.toSet());
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Updating {} to {}; there are {} differences to take into account: {}", this, proposedSnapshot, flowComparison.getDifferences().size(), flowComparison.getDifferences());
+            } else {
+                // TODO: Remove the actual differences from the info level log. It can be extremely verbose. Is here only for testing purposes becuase it's much more convenient
+                // than having to remember to enable DEBUG level logging every time a full build is done.
+                LOG.info("Updating {} to {}; there are {} differences to take into account: {}", this, proposedSnapshot, flowComparison.getDifferences().size(), flowComparison.getDifferences());
+            }
+
+            updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false);
+        } catch (final ProcessorInstantiationException pie) {
+            throw new RuntimeException(pie);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+
+    private void updateProcessGroup(final ProcessGroup group, final VersionedProcessGroup proposed, final String componentIdSeed,
+        final Set<String> updatedVersionedComponentIds, final boolean updatePosition) throws ProcessorInstantiationException {
+
+        group.setComments(proposed.getComments());
+        group.setName(proposed.getName());
+        if (updatePosition && proposed.getPosition() != null) {
+            group.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
+        }
+
+        // Determine which variables have been added/removed and add/remove them from this group's variable registry.
+        // We don't worry about if a variable value has changed, because variables are designed to be 'environment specific.'
+        // As a result, once imported, we won't update variables to match the remote flow, but we will add any missing variables
+        // and remove any variables that are no longer part of the remote flow.
+        final Set<String> existingVariableNames = group.getVariableRegistry().getVariableMap().keySet().stream()
+            .map(VariableDescriptor::getName)
+            .collect(Collectors.toSet());
+
+        final Set<String> variablesRemoved = new HashSet<>(existingVariableNames);
+        variablesRemoved.removeAll(proposed.getVariables().keySet());
+        final Map<String, String> updatedVariableMap = new HashMap<>();
+        variablesRemoved.forEach(var -> updatedVariableMap.put(var, null));
+
+        // If any new variables exist in the proposed flow, add those to the variable registry.
+        for (final Map.Entry<String, String> entry : proposed.getVariables().entrySet()) {
+            if (!existingVariableNames.contains(entry.getKey())) {
+                updatedVariableMap.put(entry.getKey(), entry.getValue());
+            }
+        }
+
+        group.setVariables(updatedVariableMap);
+
+        final RemoteFlowCoordinates remoteCoordinates = proposed.getRemoteFlowCoordinates();
+        if (remoteCoordinates != null) {
+            final String registryId = flowController.getFlowRegistryClient().getFlowRegistryId(remoteCoordinates.getRegistryUrl());
+            final String bucketId = remoteCoordinates.getBucketId();
+            final String flowId = remoteCoordinates.getFlowId();
+            final int version = remoteCoordinates.getVersion();
+
+            final VersionControlInformation vci = new StandardVersionControlInformation(registryId, bucketId, flowId, version, proposed, false, true);
+            group.setVersionControlInformation(vci, Collections.emptyMap());
+        }
+
+        // Child groups
+        // TODO: Need to take into account if child group is under version control pointing to a different Versioned Flow and if so need to handle it differently.
+        final Map<String, ProcessGroup> childGroupsByVersionedId = group.getProcessGroups().stream()
+            .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
+        final Set<String> childGroupsRemoved = new HashSet<>(childGroupsByVersionedId.keySet());
+
+        for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) {
+            final ProcessGroup childGroup = childGroupsByVersionedId.get(proposedChildGroup.getIdentifier());
+
+            if (childGroup == null) {
+                final ProcessGroup added = addProcessGroup(proposedChildGroup, componentIdSeed);
+                LOG.info("Added {} to {}", added, this);
+            } else {
+                updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true);
+                LOG.info("Updated {}", childGroup);
+            }
+
+            childGroupsRemoved.remove(proposedChildGroup.getIdentifier());
+        }
+
+
+        // Controller Services
+        final Map<String, ControllerServiceNode> servicesByVersionedId = group.getControllerServices(false).stream()
+            .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
+        final Set<String> controllerServicesRemoved = new HashSet<>(servicesByVersionedId.keySet());
+
+        for (final VersionedControllerService proposedService : proposed.getControllerServices()) {
+            final ControllerServiceNode service = servicesByVersionedId.get(proposedService.getIdentifier());
+            if (service == null) {
+                final ControllerServiceNode added = addControllerService(proposedService, componentIdSeed);
+                LOG.info("Added {} to {}", added, this);
+            } else if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) {
+                updateControllerService(service, proposedService);
+                LOG.info("Updated {}", service);
+            }
+
+            controllerServicesRemoved.remove(proposedService.getIdentifier());
+        }
+
+
+        // Funnels
+        final Map<String, Funnel> funnelsByVersionedId = group.getFunnels().stream()
+            .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
+        final Set<String> funnelsRemoved = new HashSet<>(funnelsByVersionedId.keySet());
+
+        for (final VersionedFunnel proposedFunnel : proposed.getFunnels()) {
+            final Funnel funnel = funnelsByVersionedId.get(proposedFunnel.getIdentifier());
+            if (funnel == null) {
+                final Funnel added = addFunnel(proposedFunnel, componentIdSeed);
+                LOG.info("Added {} to {}", added, this);
+            } else if (updatedVersionedComponentIds.contains(proposedFunnel.getIdentifier())) {
+                updateFunnel(funnel, proposedFunnel);
+                LOG.info("Updated {}", funnel);
+            } else {
+                funnel.setPosition(new Position(proposedFunnel.getPosition().getX(), proposedFunnel.getPosition().getY()));
+            }
+
+            funnelsRemoved.remove(proposedFunnel.getIdentifier());
+        }
+
+
+        // Input Ports
+        final Map<String, Port> inputPortsByVersionedId = group.getInputPorts().stream()
+            .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
+        final Set<String> inputPortsRemoved = new HashSet<>(inputPortsByVersionedId.keySet());
+
+        for (final VersionedPort proposedPort : proposed.getInputPorts()) {
+            final Port port = inputPortsByVersionedId.get(proposedPort.getIdentifier());
+            if (port == null) {
+                final Port added = addInputPort(proposedPort, componentIdSeed);
+                LOG.info("Added {} to {}", added, this);
+            } else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) {
+                updatePort(port, proposedPort);
+                LOG.info("Updated {}", port);
+            } else {
+                port.setPosition(new Position(proposedPort.getPosition().getX(), proposedPort.getPosition().getY()));
+            }
+
+            inputPortsRemoved.remove(proposedPort.getIdentifier());
+        }
+
+        // Output Ports
+        final Map<String, Port> outputPortsByVersionedId = group.getOutputPorts().stream()
+            .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
+        final Set<String> outputPortsRemoved = new HashSet<>(outputPortsByVersionedId.keySet());
+
+        for (final VersionedPort proposedPort : proposed.getOutputPorts()) {
+            final Port port = outputPortsByVersionedId.get(proposedPort.getIdentifier());
+            if (port == null) {
+                final Port added = addOutputPort(proposedPort, componentIdSeed);
+                LOG.info("Added {} to {}", added, this);
+            } else if (updatedVersionedComponentIds.contains(proposedPort.getIdentifier())) {
+                updatePort(port, proposedPort);
+                LOG.info("Updated {}", port);
+            } else {
+                port.setPosition(new Position(proposedPort.getPosition().getX(), proposedPort.getPosition().getY()));
+            }
+
+            outputPortsRemoved.remove(proposedPort.getIdentifier());
+        }
+
+
+        // Labels
+        final Map<String, Label> labelsByVersionedId = group.getLabels().stream()
+            .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
+        final Set<String> labelsRemoved = new HashSet<>(labelsByVersionedId.keySet());
+
+        for (final VersionedLabel proposedLabel : proposed.getLabels()) {
+            final Label label = labelsByVersionedId.get(proposedLabel.getIdentifier());
+            if (label == null) {
+                final Label added = addLabel(proposedLabel, componentIdSeed);
+                LOG.info("Added {} to {}", added, this);
+            } else if (updatedVersionedComponentIds.contains(proposedLabel.getIdentifier())) {
+                updateLabel(label, proposedLabel);
+                LOG.info("Updated {}", label);
+            } else {
+                label.setPosition(new Position(proposedLabel.getPosition().getX(), proposedLabel.getPosition().getY()));
+            }
+
+            labelsRemoved.remove(proposedLabel.getIdentifier());
+        }
+
+
+        // Processors
+        final Map<String, ProcessorNode> processorsByVersionedId = group.getProcessors().stream()
+            .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
+        final Set<String> processorsRemoved = new HashSet<>(processorsByVersionedId.keySet());
+
+        for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) {
+            final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier());
+            if (processor == null) {
+                final ProcessorNode added = addProcessor(proposedProcessor, componentIdSeed);
+                LOG.info("Added {} to {}", added, this);
+            } else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) {
+                updateProcessor(processor, proposedProcessor);
+                LOG.info("Updated {}", processor);
+            } else {
+                processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY()));
+            }
+
+            processorsRemoved.remove(proposedProcessor.getIdentifier());
+        }
+
+
+        // Remote Groups
+        final Map<String, RemoteProcessGroup> rpgsByVersionedId = group.getRemoteProcessGroups().stream()
+            .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
+        final Set<String> rpgsRemoved = new HashSet<>(rpgsByVersionedId.keySet());
+
+        for (final VersionedRemoteProcessGroup proposedRpg : proposed.getRemoteProcessGroups()) {
+            final RemoteProcessGroup rpg = rpgsByVersionedId.get(proposedRpg.getIdentifier());
+            if (rpg == null) {
+                final RemoteProcessGroup added = addRemoteProcessGroup(proposedRpg, componentIdSeed);
+                LOG.info("Added {} to {}", added, this);
+            } else if (updatedVersionedComponentIds.contains(proposedRpg.getIdentifier())) {
+                updateRemoteProcessGroup(rpg, proposedRpg);
+                LOG.info("Updated {}", rpg);
+            } else {
+                rpg.setPosition(new Position(proposedRpg.getPosition().getX(), proposedRpg.getPosition().getY()));
+            }
+
+            rpgsRemoved.remove(proposedRpg.getIdentifier());
+        }
+
+
+        // Connections
+        final Map<String, Connection> connectionsByVersionedId = group.getConnections().stream()
+            .collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
+        final Set<String> connectionsRemoved = new HashSet<>(connectionsByVersionedId.keySet());
+
+        for (final VersionedConnection proposedConnection : proposed.getConnections()) {
+            final Connection connection = connectionsByVersionedId.get(proposedConnection.getIdentifier());
+            if (connection == null) {
+                final Connection added = addConnection(proposedConnection, componentIdSeed);
+                LOG.info("Added {} to {}", added, this);
+            } else if (!connection.getSource().isRunning() && !connection.getDestination().isRunning()) {
+                // If the connection needs to be updated, then the source and destination will already have
+                // been stopped (else, the validation above would fail). So if the source or the destination is running,
+                // then we know that we don't need to update the connection.
+                updateConnection(connection, proposedConnection);
+                LOG.info("Updated {}", connection);
+            }
+
+            connectionsRemoved.remove(proposedConnection.getIdentifier());
+        }
+
+        // Remove components that exist in the local flow but not the remote flow.
+
+        // Connections must be the first thing to remove, not the last. Otherwise, we will fail
+        // to remove a component if it has a connection going to it!
+        for (final String removedVersionedId : connectionsRemoved) {
+            final Connection connection = connectionsByVersionedId.get(removedVersionedId);
+            LOG.info("Removing {} from {}", connection, group);
+            group.removeConnection(connection);
+        }
+
+        for (final String removedVersionedId : controllerServicesRemoved) {
+            final ControllerServiceNode service = servicesByVersionedId.get(removedVersionedId);
+            LOG.info("Removing {} from {}", service, group);
+            group.removeControllerService(service);
+        }
+
+        for (final String removedVersionedId : funnelsRemoved) {
+            final Funnel funnel = funnelsByVersionedId.get(removedVersionedId);
+            LOG.info("Removing {} from {}", funnel, group);
+            group.removeFunnel(funnel);
+        }
+
+        for (final String removedVersionedId : inputPortsRemoved) {
+            final Port port = inputPortsByVersionedId.get(removedVersionedId);
+            LOG.info("Removing {} from {}", port, group);
+            group.removeInputPort(port);
+        }
+
+        for (final String removedVersionedId : outputPortsRemoved) {
+            final Port port = outputPortsByVersionedId.get(removedVersionedId);
+            LOG.info("Removing {} from {}", port, group);
+            group.removeOutputPort(port);
+        }
+
+        for (final String removedVersionedId : labelsRemoved) {
+            final Label label = labelsByVersionedId.get(removedVersionedId);
+            LOG.info("Removing {} from {}", label, group);
+            group.removeLabel(label);
+        }
+
+        for (final String removedVersionedId : processorsRemoved) {
+            final ProcessorNode processor = processorsByVersionedId.get(removedVersionedId);
+            LOG.info("Removing {} from {}", processor, group);
+            group.removeProcessor(processor);
+        }
+
+        for (final String removedVersionedId : rpgsRemoved) {
+            final RemoteProcessGroup rpg = rpgsByVersionedId.get(removedVersionedId);
+            LOG.info("Removing {} from {}", rpg, group);
+            group.removeRemoteProcessGroup(rpg);
+        }
+
+        for (final String removedVersionedId : childGroupsRemoved) {
+            final ProcessGroup childGroup = childGroupsByVersionedId.get(removedVersionedId);
+            LOG.info("Removing {} from {}", childGroup, group);
+            group.removeProcessGroup(childGroup);
+        }
+    }
+
+    protected String generateUuid(final String componentIdSeed) {
+        UUID uuid;
+        if (componentIdSeed == null) {
+            uuid = ComponentIdGenerator.generateId();
+        } else {
+            try {
+                UUID seedId = UUID.fromString(componentIdSeed);
+                uuid = new UUID(seedId.getMostSignificantBits(), componentIdSeed.hashCode());
+            } catch (Exception e) {
+                LOG.warn("Provided 'seed' does not represent UUID. Will not be able to extract most significant bits for ID generation.");
+                uuid = UUID.nameUUIDFromBytes(componentIdSeed.getBytes(StandardCharsets.UTF_8));
+            }
+        }
+
+        return uuid.toString();
+    }
+
+
+    private ProcessGroup addProcessGroup(final VersionedProcessGroup proposed, final String componentIdSeed) throws ProcessorInstantiationException {
+        final ProcessGroup group = flowController.createProcessGroup(generateUuid(componentIdSeed));
+        group.setVersionedComponentId(proposed.getIdentifier());
+        addProcessGroup(group);
+        updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true);
+        return group;
+    }
+
+    private void updateConnection(final Connection connection, final VersionedConnection proposed) {
+        connection.setBendPoints(proposed.getBends().stream()
+            .map(pos -> new Position(pos.getX(), pos.getY()))
+            .collect(Collectors.toList()));
+
+        connection.setDestination(getConnectable(proposed.getDestination()));
+        connection.setLabelIndex(proposed.getLabelIndex());
+        connection.setName(proposed.getName());
+        connection.setRelationships(proposed.getSelectedRelationships().stream()
+            .map(name -> new Relationship.Builder().name(name).build())
+            .collect(Collectors.toSet()));
+        connection.setZIndex(proposed.getzIndex());
+
+        final FlowFileQueue queue = connection.getFlowFileQueue();
+        queue.setBackPressureDataSizeThreshold(proposed.getBackPressureDataSizeThreshold());
+        queue.setBackPressureObjectThreshold(proposed.getBackPressureObjectThreshold());
+        queue.setFlowFileExpiration(proposed.getFlowFileExpiration());
+
+        final List<FlowFilePrioritizer> prioritizers = proposed.getPrioritizers().stream()
+            .map(prioritizerName -> {
+                try {
+                    return flowController.createPrioritizer(prioritizerName);
+                } catch (final Exception e) {
+                    throw new RuntimeException("Failed to create Prioritizer of type " + prioritizerName + " for Connection with ID " + connection.getIdentifier());
+                }
+            })
+            .collect(Collectors.toList());
+
+        queue.setPriorities(prioritizers);
+    }
+
+    private Connection addConnection(final VersionedConnection proposed, final String componentIdSeed) {
+        final Connectable source = getConnectable(proposed.getSource());
+        if (source == null) {
+            throw new IllegalArgumentException("Connection has a source with identifier " + proposed.getIdentifier()
+                + " but no component could be found in the Process Group with a corresponding identifier");
+        }
+
+        final Connectable destination = getConnectable(proposed.getDestination());
+        if (destination == null) {
+            throw new IllegalArgumentException("Connection has a destination with identifier " + proposed.getIdentifier()
+                + " but no component could be found in the Process Group with a corresponding identifier");
+        }
+
+        final Connection connection = flowController.createConnection(generateUuid(componentIdSeed), proposed.getName(), source, destination, proposed.getSelectedRelationships());
+        connection.setVersionedComponentId(proposed.getIdentifier());
+        addConnection(connection);
+        updateConnection(connection, proposed);
+
+        return connection;
+    }
+
+    private Connectable getConnectable(final ConnectableComponent connectableComponent) {
+        final String id = connectableComponent.getId();
+
+        switch (connectableComponent.getType()) {
+            case FUNNEL:
+                return getFunnels().stream()
+                    .filter(component -> component.getVersionedComponentId().isPresent())
+                    .filter(component -> id.equals(component.getVersionedComponentId().get()))
+                    .findAny()
+                    .orElse(null);
+            case INPUT_PORT:
+                return getInputPorts().stream()
+                    .filter(component -> component.getVersionedComponentId().isPresent())
+                    .filter(component -> id.equals(component.getVersionedComponentId().get()))
+                    .findAny()
+                    .orElse(null);
+            case OUTPUT_PORT:
+                return getOutputPorts().stream()
+                    .filter(component -> component.getVersionedComponentId().isPresent())
+                    .filter(component -> id.equals(component.getVersionedComponentId().get()))
+                    .findAny()
+                    .orElse(null);
+            case PROCESSOR:
+                return getProcessors().stream()
+                    .filter(component -> component.getVersionedComponentId().isPresent())
+                    .filter(component -> id.equals(component.getVersionedComponentId().get()))
+                    .findAny()
+                    .orElse(null);
+            case REMOTE_INPUT_PORT: {
+                final String rpgId = connectableComponent.getGroupId();
+                final Optional<RemoteProcessGroup> rpgOption = getRemoteProcessGroups().stream()
+                    .filter(component -> component.getVersionedComponentId().isPresent())
+                    .filter(component -> id.equals(component.getVersionedComponentId().get()))
+                    .findAny();
+
+                if (!rpgOption.isPresent()) {
+                    throw new IllegalArgumentException("Connection refers to a Port with ID " + id + " within Remote Process Group with ID "
+                        + rpgId + " but could not find a Remote Process Group corresponding to that ID");
+                }
+
+                final RemoteProcessGroup rpg = rpgOption.get();
+                return rpg.getInputPorts().stream()
+                    .filter(component -> component.getVersionedComponentId().isPresent())
+                    .filter(component -> id.equals(component.getVersionedComponentId().get()))
+                    .findAny()
+                    .orElse(null);
+            }
+            case REMOTE_OUTPUT_PORT: {
+                final String rpgId = connectableComponent.getGroupId();
+                final Optional<RemoteProcessGroup> rpgOption = getRemoteProcessGroups().stream()
+                    .filter(component -> component.getVersionedComponentId().isPresent())
+                    .filter(component -> id.equals(component.getVersionedComponentId().get()))
+                    .findAny();
+
+                if (!rpgOption.isPresent()) {
+                    throw new IllegalArgumentException("Connection refers to a Port with ID " + id + " within Remote Process Group with ID "
+                        + rpgId + " but could not find a Remote Process Group corresponding to that ID");
+                }
+
+                final RemoteProcessGroup rpg = rpgOption.get();
+                return rpg.getOutputPorts().stream()
+                    .filter(component -> component.getVersionedComponentId().isPresent())
+                    .filter(component -> id.equals(component.getVersionedComponentId().get()))
+                    .findAny()
+                    .orElse(null);
+            }
+        }
+
+        return null;
+    }
+
+    private void updateControllerService(final ControllerServiceNode service, final VersionedControllerService proposed) {
+        service.setAnnotationData(proposed.getAnnotationData());
+        service.setComments(proposed.getComments());
+        service.setName(proposed.getName());
+        service.setProperties(populatePropertiesMap(service.getProperties(), proposed.getProperties()));
+
+        if (!isEqual(service.getBundleCoordinate(), proposed.getBundle())) {
+            final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle());
+            final List<PropertyDescriptor> descriptors = new ArrayList<>(service.getProperties().keySet());
+            final Set<URL> additionalUrls = service.getAdditionalClasspathResources(descriptors);
+            flowController.reload(service, proposed.getType(), newBundleCoordinate, additionalUrls);
+        }
+    }
+
+    private boolean isEqual(final BundleCoordinate coordinate, final Bundle bundle) {
+        if (!bundle.getGroup().equals(coordinate.getGroup())) {
+            return false;
+        }
+
+        if (!bundle.getArtifact().equals(coordinate.getId())) {
+            return false;
+        }
+
+        if (!bundle.getVersion().equals(coordinate.getVersion())) {
+            return false;
+        }
+
+        return true;
+    }
+
+    private BundleCoordinate toCoordinate(final Bundle bundle) {
+        return new BundleCoordinate(bundle.getGroup(), bundle.getArtifact(), bundle.getVersion());
+    }
+
+    private ControllerServiceNode addControllerService(final VersionedControllerService proposed, final String componentIdSeed) {
+        final String type = proposed.getType();
+        final String id = generateUuid(componentIdSeed);
+
+        final Bundle bundle = proposed.getBundle();
+        final BundleCoordinate coordinate = toCoordinate(bundle);
+        final boolean firstTimeAdded = true;
+        final Set<URL> additionalUrls = Collections.emptySet();
+
+        final ControllerServiceNode newService = flowController.createControllerService(type, id, coordinate, additionalUrls, firstTimeAdded);
+        newService.setVersionedComponentId(proposed.getIdentifier());
+
+        addControllerService(newService);
+        updateControllerService(newService, proposed);
+
+        return newService;
+    }
+
+    private void updateFunnel(final Funnel funnel, final VersionedFunnel proposed) {
+        funnel.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
+    }
+
+    private Funnel addFunnel(final VersionedFunnel proposed, final String componentIdSeed) {
+        final Funnel funnel = flowController.createFunnel(generateUuid(componentIdSeed));
+        funnel.setVersionedComponentId(proposed.getIdentifier());
+        addFunnel(funnel);
+        updateFunnel(funnel, proposed);
+
+        return funnel;
+    }
+
+    private void updatePort(final Port port, final VersionedPort proposed) {
+        port.setComments(proposed.getComments());
+        port.setName(proposed.getName());
+        port.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
+    }
+
+    private Port addInputPort(final VersionedPort proposed, final String componentIdSeed) {
+        final Port port = flowController.createLocalInputPort(generateUuid(componentIdSeed), proposed.getName());
+        port.setVersionedComponentId(proposed.getIdentifier());
+        addInputPort(port);
+        updatePort(port, proposed);
+
+        return port;
+    }
+
+    private Port addOutputPort(final VersionedPort proposed, final String componentIdSeed) {
+        final Port port = flowController.createLocalInputPort(generateUuid(componentIdSeed), proposed.getName());
+        port.setVersionedComponentId(proposed.getIdentifier());
+        addOutputPort(port);
+        updatePort(port, proposed);
+
+        return port;
+    }
+
+    private Label addLabel(final VersionedLabel proposed, final String componentIdSeed) {
+        final Label label = flowController.createLabel(generateUuid(componentIdSeed), proposed.getLabel());
+        label.setVersionedComponentId(proposed.getIdentifier());
+        addLabel(label);
+        updateLabel(label, proposed);
+
+        return label;
+    }
+
+    private void updateLabel(final Label label, final VersionedLabel proposed) {
+        label.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
+        label.setSize(new Size(proposed.getWidth(), proposed.getHeight()));
+        label.setStyle(proposed.getStyle());
+        label.setValue(proposed.getLabel());
+    }
+
+    private ProcessorNode addProcessor(final VersionedProcessor proposed, final String componentIdSeed) throws ProcessorInstantiationException {
+        final BundleCoordinate coordinate = toCoordinate(proposed.getBundle());
+        final ProcessorNode procNode = flowController.createProcessor(proposed.getType(), generateUuid(componentIdSeed), coordinate, true);
+        procNode.setVersionedComponentId(proposed.getIdentifier());
+
+        addProcessor(procNode);
+        updateProcessor(procNode, proposed);
+
+        return procNode;
+    }
+
+    private void updateProcessor(final ProcessorNode processor, final VersionedProcessor proposed) throws ProcessorInstantiationException {
+        processor.setAnnotationData(proposed.getAnnotationData());
+        processor.setBulletinLevel(LogLevel.valueOf(proposed.getBulletinLevel()));
+        processor.setComments(proposed.getComments());
+        processor.setMaxConcurrentTasks(proposed.getConcurrentlySchedulableTaskCount());
+        processor.setExecutionNode(ExecutionNode.valueOf(proposed.getExecutionNode()));
+        processor.setName(proposed.getName());
+        processor.setPenalizationPeriod(proposed.getPenaltyDuration());
+        processor.setProperties(populatePropertiesMap(processor.getProperties(), proposed.getProperties()));
+        processor.setRunDuration(proposed.getRunDurationMillis(), TimeUnit.MILLISECONDS);
+        processor.setScheduldingPeriod(proposed.getSchedulingPeriod());
+        processor.setSchedulingStrategy(SchedulingStrategy.valueOf(proposed.getSchedulingStrategy()));
+        processor.setStyle(proposed.getStyle());
+        processor.setYieldPeriod(proposed.getYieldDuration());
+        processor.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
+
+        processor.setAutoTerminatedRelationships(proposed.getAutoTerminatedRelationships().stream()
+            .map(relName -> processor.getRelationship(relName))
+            .collect(Collectors.toSet()));
+
+        if (!isEqual(processor.getBundleCoordinate(), proposed.getBundle())) {
+            final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle());
+            final List<PropertyDescriptor> descriptors = new ArrayList<>(processor.getProperties().keySet());
+            final Set<URL> additionalUrls = processor.getAdditionalClasspathResources(descriptors);
+            flowController.reload(processor, proposed.getType(), newBundleCoordinate, additionalUrls);
+        }
+    }
+
+    private Map<String, String> populatePropertiesMap(final Map<PropertyDescriptor, String> currentProperties, final Map<String, String> proposedProperties) {
+        final Map<String, String> fullPropertyMap = new HashMap<>();
+        for (final PropertyDescriptor property : currentProperties.keySet()) {
+            fullPropertyMap.put(property.getName(), null);
+        }
+
+        fullPropertyMap.putAll(proposedProperties);
+        return fullPropertyMap;
+    }
+
+    private RemoteProcessGroup addRemoteProcessGroup(final VersionedRemoteProcessGroup proposed, final String componentIdSeed) {
+        final RemoteProcessGroup rpg = flowController.createRemoteProcessGroup(generateUuid(componentIdSeed), proposed.getTargetUris());
+        rpg.setVersionedComponentId(proposed.getIdentifier());
+
+        addRemoteProcessGroup(rpg);
+        updateRemoteProcessGroup(rpg, proposed);
+
+        return rpg;
+    }
+
+    private void updateRemoteProcessGroup(final RemoteProcessGroup rpg, final VersionedRemoteProcessGroup proposed) {
+        rpg.setComments(proposed.getComments());
+        rpg.setCommunicationsTimeout(proposed.getCommunicationsTimeout());
+        rpg.setInputPorts(proposed.getInputPorts().stream()
+            .map(port -> createPortDescriptor(port))
+            .collect(Collectors.toSet()));
+        rpg.setName(proposed.getName());
+        rpg.setNetworkInterface(proposed.getLocalNetworkInterface());
+        rpg.setOutputPorts(proposed.getOutputPorts().stream()
+            .map(port -> createPortDescriptor(port))
+            .collect(Collectors.toSet()));
+        rpg.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
+        rpg.setProxyHost(proposed.getProxyHost());
+        rpg.setProxyPort(proposed.getProxyPort());
+        rpg.setProxyUser(proposed.getProxyUser());
+        rpg.setTransportProtocol(SiteToSiteTransportProtocol.valueOf(proposed.getTransportProtocol()));
+        rpg.setYieldDuration(proposed.getYieldDuration());
+    }
+
+    private RemoteProcessGroupPortDescriptor createPortDescriptor(final VersionedRemoteGroupPort proposed) {
+        final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor();
+        descriptor.setVersionedComponentId(proposed.getIdentifier());
+        descriptor.setBatchCount(proposed.getBatchSize().getCount());
+        descriptor.setBatchDuration(proposed.getBatchSize().getDuration());
+        descriptor.setBatchSize(proposed.getBatchSize().getSize());
+        descriptor.setComments(proposed.getComments());
+        descriptor.setConcurrentlySchedulableTaskCount(proposed.getConcurrentlySchedulableTaskCount());
+        descriptor.setGroupId(proposed.getGroupId());
+        descriptor.setId(UUID.randomUUID().toString()); // TODO: Need to address this issue of port id's
+        descriptor.setName(proposed.getName());
+        descriptor.setUseCompression(proposed.isUseCompression());
+        return descriptor;
+    }
+
+
+    public Optional<Boolean> isModified() {
+        final StandardVersionControlInformation vci = versionControlInfo.get();
+
+        // If this group is not under version control, then we need to notify the parent
+        // group (if any) that a modification has taken place. Otherwise, we need to
+        // compare the current the flow with the 'versioned snapshot' of the flow in order
+        // to determine if the flows are different.
+        // We cannot simply say 'if something changed then this flow is different than the versioned snapshot'
+        // because if we do this, and a user adds a processor then subsequently removes it, then the logic would
+        // say that the flow is modified. There would be no way to ever go back to the flow not being modified.
+        // So we have to perform a diff of the flows and see if they are the same.
+        if (vci == null) {
+            return Optional.of(Boolean.FALSE);
+        }
+
+        if (vci.getFlowSnapshot() == null) {
+            // we haven't retrieved the flow from the Flow Registry yet, so we don't know if it's been modified.
+            // As a result, we will just return an empty optional
+            return Optional.empty();
+        }
+
+        final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
+        final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient());
+
+        final ComparableDataFlow currentFlow = new ComparableDataFlow() {
+            @Override
+            public VersionedProcessGroup getContents() {
+                return versionedGroup;
+            }
+
+            @Override
+            public String getName() {
+                return "Local Flow";
+            }
+        };
+
+        final ComparableDataFlow snapshotFlow = new ComparableDataFlow() {
+            @Override
+            public VersionedProcessGroup getContents() {
+                return vci.getFlowSnapshot();
+            }
+
+            @Override
+            public String getName() {
+                return "Flow Under Version Control";
+            }
+        };
+
+        final FlowComparator flowComparator = new StandardFlowComparator(currentFlow, snapshotFlow);
+        final FlowComparison comparison = flowComparator.compare();
+        final Set<FlowDifference> differences = comparison.getDifferences();
+        final boolean modified = differences.stream()
+            .filter(diff -> diff.getDifferenceType() != DifferenceType.POSITION_CHANGED)
+            .filter(diff -> diff.getDifferenceType() != DifferenceType.STYLE_CHANGED)
+            .findAny()
+            .isPresent();
+
+        LOG.debug("There are {} differences between this flow and the versioned snapshot of this flow: {}", differences.size(), differences);
+        return Optional.of(modified);
+    }
+
+
+    @Override
+    public void verifyCanUpdate(final VersionedFlowSnapshot updatedFlow, final boolean verifyConnectionRemoval, final boolean verifyNotDirty) {
+        readLock.lock();
+        try {
+            final VersionControlInformation versionControlInfo = getVersionControlInformation();
+            if (versionControlInfo == null) {
+                throw new IllegalStateException("Cannot update the Version of the flow for " + this
+                    + " because the Process Group is not currently under Version Control");
+            }
+
+            if (!versionControlInfo.getFlowIdentifier().equals(updatedFlow.getSnapshotMetadata().getFlowIdentifier())) {
+                throw new IllegalStateException(this + " is under version control but the given flow does not match the flow that this Process Group is synchronized with");
+            }
+
+            if (verifyNotDirty) {
+                final Optional<Boolean> modifiedOption = versionControlInfo.getModified();
+                if (!modifiedOption.isPresent()) {
+                    throw new IllegalStateException(this + " cannot be updated to a different version of the flow because the local flow "
+                        + "has not yet been synchronized with the Flow Registry. The Process Group must be"
+                        + " synched with the Flow Registry before continuing. This will happen periodically in the background, so please try the request again later");
+                }
+
+                if (Boolean.TRUE.equals(modifiedOption.get())) {
+                    throw new IllegalStateException("Cannot change the Version of the flow for " + this
+                        + " because the Process Group has been modified since it was last synchronized with the Flow Registry. The Process Group must be"
+                        + " restored to its original form before changing the version");
+                }
+            }
+
+            final VersionedProcessGroup flowContents = updatedFlow.getFlowContents();
+            if (verifyConnectionRemoval) {
+                // Determine which Connections have been removed.
+                final Map<String, Connection> removedConnectionByVersionedId = new HashMap<>();
+                findAllConnections().stream()
+                    .filter(conn -> conn.getVersionedComponentId().isPresent())
+                    .forEach(conn -> removedConnectionByVersionedId.put(conn.getVersionedComponentId().get(), conn));
+
+                final Set<String> proposedFlowConnectionIds = new HashSet<>();
+                findAllConnectionIds(flowContents, proposedFlowConnectionIds);
+
+                for (final String proposedConnectionId : proposedFlowConnectionIds) {
+                    removedConnectionByVersionedId.remove(proposedConnectionId);
+                }
+
+                // If any connection that was removed has data in it, throw an IllegalStateException
+                for (final Connection connection : removedConnectionByVersionedId.values()) {
+                    final FlowFileQueue flowFileQueue = connection.getFlowFileQueue();
+                    if (!flowFileQueue.isEmpty()) {
+                        throw new IllegalStateException(this + " cannot be updated to the proposed version of the flow because the "
+                            + "proposed version does not contain "
+                            + connection + " and the connection currently has data in the queue.");
+                    }
+                }
+            }
+
+            // Determine which input ports were removed from this process group
+            final Map<String, Port> removedInputPortsByVersionId = new HashMap<>();
+            getInputPorts().stream()
+                .filter(port -> port.getVersionedComponentId().isPresent())
+                .forEach(port -> removedInputPortsByVersionId.put(port.getVersionedComponentId().get(), port));
+            flowContents.getInputPorts().stream()
+                .map(VersionedPort::getIdentifier)
+                .forEach(id -> removedInputPortsByVersionId.remove(id));
+
+            // Ensure that there are no incoming connections for any Input Port that was removed.
+            for (final Port inputPort : removedInputPortsByVersionId.values()) {
+                final List<Connection> incomingConnections = inputPort.getIncomingConnections();
+                if (!incomingConnections.isEmpty()) {
+                    throw new IllegalStateException(this + " cannot be updated to the proposed version of the flow because the proposed version does not contain the Input Port "
+                        + inputPort + " and the Input Port currently has an incoming connections");
+                }
+            }
+
+            // Determine which output ports were removed from this process group
+            final Map<String, Port> removedOutputPortsByVersionId = new HashMap<>();
+            getOutputPorts().stream()
+                .filter(port -> port.getVersionedComponentId().isPresent())
+                .forEach(port -> removedOutputPortsByVersionId.put(port.getVersionedComponentId().get(), port));
+            flowContents.getOutputPorts().stream()
+                .map(VersionedPort::getIdentifier)
+                .forEach(id -> removedOutputPortsByVersionId.remove(id));
+
+            // Ensure that there are no outgoing connections for any Output Port that was removed.
+            for (final Port outputPort : removedOutputPortsByVersionId.values()) {
+                final Set<Connection> outgoingConnections = outputPort.getConnections();
+                if (!outgoingConnections.isEmpty()) {
+                    throw new IllegalStateException(this + " cannot be updated to the proposed version of the flow because the proposed version does not contain the Output Port "
+                        + outputPort + " and the Output Port currently has an outgoing connections");
+                }
+            }
+
+            // Find any Process Groups that may have been deleted. If we find any Process Group that was deleted, and that Process Group
+            // has Templates, then we fail because the Templates have to be removed first.
+            final Map<String, VersionedProcessGroup> proposedProcessGroups = new HashMap<>();
+            findAllProcessGroups(updatedFlow.getFlowContents(), proposedProcessGroups);
+
+            for (final ProcessGroup childGroup : findAllProcessGroups()) {
+                if (childGroup.getTemplates().isEmpty()) {
+                    continue;
+                }
+
+                final Optional<String> versionedIdOption = childGroup.getVersionedComponentId();
+                if (!versionedIdOption.isPresent()) {
+                    continue;
+                }
+
+                final String versionedId = versionedIdOption.get();
+                if (!proposedProcessGroups.containsKey(versionedId)) {
+                    // Process Group was removed.
+                    throw new IllegalStateException(this + " cannot be updated to the proposed version of the flow because the child " + childGroup
+                        + " that exists locally has one or more Templates, and the proposed flow does not contain this Process Group. "
+                        + "A Process Group cannot be deleted while it contains Templates. Please remove the Templates before attempting to chnage the version of the flow.");
+                }
+            }
+
+
+            // Ensure that all Processors are instantiate-able.
+            final Map<String, VersionedProcessor> proposedProcessors = new HashMap<>();
+            findAllProcessors(updatedFlow.getFlowContents(), proposedProcessors);
+
+            findAllProcessors().stream()
+                .filter(proc -> proc.getVersionedComponentId().isPresent())
+                .forEach(proc -> proposedProcessors.remove(proc.getVersionedComponentId().get()));
+
+            for (final VersionedProcessor processorToAdd : proposedProcessors.values()) {
+                final BundleCoordinate coordinate = toCoordinate(processorToAdd.getBundle());
+                try {
+                    flowController.createProcessor(processorToAdd.getType(), UUID.randomUUID().toString(), coordinate, false);
+                } catch (Exception e) {
+                    throw new IllegalArgumentException("Unable to create Processor of type " + processorToAdd.getType(), e);
+                }
+            }
+
+            // Ensure that all Controller Services are instantiate-able.
+            final Map<String, VersionedControllerService> proposedServices = new HashMap<>();
+            findAllControllerServices(updatedFlow.getFlowContents(), proposedServices);
+
+            findAllControllerServices().stream()
+                .filter(service -> service.getVersionedComponentId().isPresent())
+                .forEach(service -> proposedServices.remove(service.getVersionedComponentId().get()));
+
+            for (final VersionedControllerService serviceToAdd : proposedServices.values()) {
+                final BundleCoordinate coordinate = toCoordinate(serviceToAdd.getBundle());
+                try {
+                    flowController.createControllerService(serviceToAdd.getType(), UUID.randomUUID().toString(), coordinate, Collections.emptySet(), false);
+                } catch (Exception e) {
+                    throw new IllegalArgumentException("Unable to create Controller Service of type " + serviceToAdd.getType(), e);
+                }
+            }
+
+            // Ensure that all Prioritizers are instantiate-able.
+            final Map<String, VersionedConnection> proposedConnections = new HashMap<>();
+            findAllConnections(updatedFlow.getFlowContents(), proposedConnections);
+
+            findAllConnections().stream()
+                .filter(conn -> conn.getVersionedComponentId().isPresent())
+                .forEach(conn -> proposedConnections.remove(conn.getVersionedComponentId().get()));
+
+            for (final VersionedConnection connectionToAdd : proposedConnections.values()) {
+                for (final String prioritizerType : connectionToAdd.getPrioritizers()) {
+                    try {
+                        flowController.createPrioritizer(prioritizerType);
+                    } catch (Exception e) {
+                        throw new IllegalArgumentException("Unable to create Prioritizer of type " + prioritizerType, e);
+                    }
+                }
+            }
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    private void findAllConnectionIds(final VersionedProcessGroup group, final Set<String> ids) {
+        for (final VersionedConnection connection : group.getConnections()) {
+            ids.add(connection.getIdentifier());
+        }
+
+        for (final VersionedProcessGroup childGroup : group.getProcessGroups()) {
+            findAllConnectionIds(childGroup, ids);
+        }
+    }
+
+    private void findAllProcessors(final VersionedProcessGroup group, final Map<String, VersionedProcessor> map) {
+        for (final VersionedProcessor processor : group.getProcessors()) {
+            map.put(processor.getIdentifier(), processor);
+        }
+
+        for (final VersionedProcessGroup childGroup : group.getProcessGroups()) {
+            findAllProcessors(childGroup, map);
+        }
+    }
+
+    private void findAllControllerServices(final VersionedProcessGroup group, final Map<String, VersionedControllerService> map) {
+        for (final VersionedControllerService service : group.getControllerServices()) {
+            map.put(service.getIdentifier(), service);
+        }
+
+        for (final VersionedProcessGroup childGroup : group.getProcessGroups()) {
+            findAllControllerServices(childGroup, map);
+        }
+    }
+
+    private void findAllConnections(final VersionedProcessGroup group, final Map<String, VersionedConnection> map) {
+        for (final VersionedConnection connection : group.getConnections()) {
+            map.put(connection.getIdentifier(), connection);
+        }
+
+        for (final VersionedProcessGroup childGroup : group.getProcessGroups()) {
+            findAllConnections(childGroup, map);
+        }
+    }
+
+    private void findAllProcessGroups(final VersionedProcessGroup group, final Map<String, VersionedProcessGroup> map) {
+        map.put(group.getIdentifier(), group);
+
+        for (final VersionedProcessGroup child : group.getProcessGroups()) {
+            findAllProcessGroups(child, map);
+        }
+    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistryClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistryClient.java
new file mode 100644 (file)
index 0000000..22ba50b
--- /dev/null
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.util.DefaultPrettyPrinter;
+
+/**
+ * A simple file-based implementation of a Flow Registry Client. Rather than interacting
+ * with an actual Flow Registry, this implementation simply reads flows from disk and writes
+ * them to disk. It is not meant for any production use but is available for testing purposes.
+ */
+public class FileBasedFlowRegistryClient implements FlowRegistryClient, FlowRegistry {
+    private final File directory;
+    private final Map<String, Set<String>> flowNamesByBucket = new HashMap<>();
+    private final JsonFactory jsonFactory = new JsonFactory();
+
+    public FileBasedFlowRegistryClient(final File directory) throws IOException {
+        if (!directory.exists() && !directory.mkdirs()) {
+            throw new IOException("Could not access or create directory " + directory.getAbsolutePath() + " for Flow Registry");
+        }
+
+        this.directory = directory;
+        recoverBuckets();
+    }
+
+    private void recoverBuckets() throws IOException {
+        final File[] bucketDirs = directory.listFiles();
+        if (bucketDirs == null) {
+            throw new IOException("Could not get listing of directory " + directory);
+        }
+
+        for (final File bucketDir : bucketDirs) {
+            final File[] flowDirs = bucketDir.listFiles();
+            if (flowDirs == null) {
+                throw new IOException("Could not get listing of directory " + bucketDir);
+            }
+
+            final Set<String> flowNames = new HashSet<>();
+            for (final File flowDir : flowDirs) {
+                final File propsFile = new File(flowDir, "flow.properties");
+                if (!propsFile.exists()) {
+                    continue;
+                }
+
+                final Properties properties = new Properties();
+                try (final InputStream in = new FileInputStream(propsFile)) {
+                    properties.load(in);
+                }
+
+                final String flowName = properties.getProperty("name");
+                if (flowName == null) {
+                    continue;
+                }
+
+                flowNames.add(flowName);
+            }
+
+            if (!flowNames.isEmpty()) {
+                flowNamesByBucket.put(bucketDir.getName(), flowNames);
+            }
+        }
+    }
+
+    @Override
+    public FlowRegistry getFlowRegistry(final String registryId) {
+        if (!"default".equals(registryId)) {
+            return null;
+        }
+
+        return this;
+    }
+
+    @Override
+    public String getURL() {
+        return directory.toURI().toString();
+    }
+
+    @Override
+    public synchronized VersionedFlow registerVersionedFlow(final VersionedFlow flow) throws IOException, UnknownResourceException {
+        Objects.requireNonNull(flow);
+        Objects.requireNonNull(flow.getBucketIdentifier());
+        Objects.requireNonNull(flow.getName());
+
+        // Verify that bucket exists
+        final File bucketDir = new File(directory, flow.getBucketIdentifier());
+        if (!bucketDir.exists()) {
+            throw new UnknownResourceException("No bucket exists with ID " + flow.getBucketIdentifier());
+        }
+
+        // Verify that there is no flow with the same name in that bucket
+        final Set<String> flowNames = flowNamesByBucket.get(flow.getBucketIdentifier());
+        if (flowNames != null && flowNames.contains(flow.getName())) {
+            throw new IllegalArgumentException("Flow with name '" + flow.getName() + "' already exists for Bucket with ID " + flow.getBucketIdentifier());
+        }
+
+        final String flowIdentifier = UUID.randomUUID().toString();
+        final File flowDir = new File(bucketDir, flowIdentifier);
+        if (!flowDir.mkdirs()) {
+            throw new IOException("Failed to create directory " + flowDir + " for new Flow");
+        }
+
+        final File propertiesFile = new File(flowDir, "flow.properties");
+
+        final Properties flowProperties = new Properties();
+        flowProperties.setProperty("name", flow.getName());
+        flowProperties.setProperty("created", String.valueOf(flow.getCreatedTimestamp()));
+        flowProperties.setProperty("description", flow.getDescription());
+        flowProperties.setProperty("lastModified", String.valueOf(flow.getModifiedTimestamp()));
+
+        try (final OutputStream out = new FileOutputStream(propertiesFile)) {
+            flowProperties.store(out, null);
+        }
+
+        final VersionedFlow response = new VersionedFlow();
+        response.setBucketIdentifier(flow.getBucketIdentifier());
+        response.setCreatedTimestamp(flow.getCreatedTimestamp());
+        response.setDescription(flow.getDescription());
+        response.setIdentifier(flowIdentifier);
+        response.setModifiedTimestamp(flow.getModifiedTimestamp());
+        response.setName(flow.getName());
+
+        return response;
+    }
+
+    @Override
+    public synchronized VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, final String comments)
+            throws IOException, UnknownResourceException {
+        Objects.requireNonNull(flow);
+        Objects.requireNonNull(flow.getBucketIdentifier());
+        Objects.requireNonNull(flow.getName());
+        Objects.requireNonNull(snapshot);
+
+        // Verify that the bucket exists
+        final File bucketDir = new File(directory, flow.getBucketIdentifier());
+        if (!bucketDir.exists()) {
+            throw new UnknownResourceException("No bucket exists with ID " + flow.getBucketIdentifier());
+        }
+
+        // Verify that the flow exists
+        final File flowDir = new File(bucketDir, flow.getIdentifier());
+        if (!flowDir.exists()) {
+            throw new UnknownResourceException("No Flow with ID " + flow.getIdentifier() + " exists for Bucket with ID " + flow.getBucketIdentifier());
+        }
+
+        final File[] versionDirs = flowDir.listFiles();
+        if (versionDirs == null) {
+            throw new IOException("Unable to perform listing of directory " + flowDir);
+        }
+
+        int maxVersion = 0;
+        for (final File versionDir : versionDirs) {
+            final String versionName = versionDir.getName();
+
+            final int version;
+            try {
+                version = Integer.parseInt(versionName);
+            } catch (final NumberFormatException nfe) {
+                continue;
+            }
+
+            if (version > maxVersion) {
+                maxVersion = version;
+            }
+        }
+
+        final int snapshotVersion = maxVersion + 1;
+        final File snapshotDir = new File(flowDir, String.valueOf(snapshotVersion));
+        if (!snapshotDir.mkdir()) {
+            throw new IOException("Could not create directory " + snapshotDir);
+        }
+
+        final File contentsFile = new File(snapshotDir, "flow.xml");
+
+        try (final OutputStream out = new FileOutputStream(contentsFile);
+            final JsonGenerator generator = jsonFactory.createJsonGenerator(out)) {
+            generator.setCodec(new ObjectMapper());
+            generator.setPrettyPrinter(new DefaultPrettyPrinter());
+            generator.writeObject(snapshot);
+        }
+
+        final Properties snapshotProperties = new Properties();
+        snapshotProperties.setProperty("comments", comments);
+        snapshotProperties.setProperty("name", flow.getName());
+        final File snapshotPropsFile = new File(snapshotDir, "snapshot.properties");
+        try (final OutputStream out = new FileOutputStream(snapshotPropsFile)) {
+            snapshotProperties.store(out, null);
+        }
+
+        final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
+        snapshotMetadata.setBucketIdentifier(flow.getBucketIdentifier());
+        snapshotMetadata.setComments(comments);
+        snapshotMetadata.setFlowIdentifier(flow.getIdentifier());
+        snapshotMetadata.setFlowName(flow.getName());
+        snapshotMetadata.setTimestamp(System.currentTimeMillis());
+        snapshotMetadata.setVersion(snapshotVersion);
+
+        final VersionedFlowSnapshot response = new VersionedFlowSnapshot();
+        response.setSnapshotMetadata(snapshotMetadata);
+        response.setFlowContents(snapshot);
+        return response;
+    }
+
+    @Override
+    public Set<String> getRegistryIdentifiers() {
+        return Collections.singleton("default");
+    }
+
+    @Override
+    public int getLatestVersion(final String bucketId, final String flowId) throws IOException, UnknownResourceException {
+        // Verify that the bucket exists
+        final File bucketDir = new File(directory, bucketId);
+        if (!bucketDir.exists()) {
+            throw new UnknownResourceException("No bucket exists with ID " + bucketId);
+        }
+
+        // Verify that the flow exists
+        final File flowDir = new File(bucketDir, flowId);
+        if (!flowDir.exists()) {
+            throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + bucketId);
+        }
+
+        final File[] versionDirs = flowDir.listFiles();
+        if (versionDirs == null) {
+            throw new IOException("Unable to perform listing of directory " + flowDir);
+        }
+
+        int maxVersion = 0;
+        for (final File versionDir : versionDirs) {
+            final String versionName = versionDir.getName();
+
+            final int version;
+            try {
+                version = Integer.parseInt(versionName);
+            } catch (final NumberFormatException nfe) {
+                continue;
+            }
+
+            if (version > maxVersion) {
+                maxVersion = version;
+            }
+        }
+
+        return maxVersion;
+    }
+
+    @Override
+    public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, int version) throws IOException, UnknownResourceException {
+        // Verify that the bucket exists
+        final File bucketDir = new File(directory, bucketId);
+        if (!bucketDir.exists()) {
+            throw new UnknownResourceException("No bucket exists with ID " + bucketId);
+        }
+
+        // Verify that the flow exists
+        final File flowDir = new File(bucketDir, flowId);
+        if (!flowDir.exists()) {
+            throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId);
+        }
+
+        final File versionDir = new File(flowDir, String.valueOf(version));
+        if (!versionDir.exists()) {
+            throw new UnknownResourceException("Flow with ID " + flowId + " in Bucket with ID " + bucketId + " does not contain a snapshot with version " + version);
+        }
+
+        final File contentsFile = new File(versionDir, "flow.xml");
+
+        final VersionedProcessGroup processGroup;
+        try (final JsonParser parser = jsonFactory.createJsonParser(contentsFile)) {
+            final ObjectMapper mapper = new ObjectMapper();
+            mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+            parser.setCodec(mapper);
+            processGroup = parser.readValueAs(VersionedProcessGroup.class);
+        }
+
+        final Properties properties = new Properties();
+        final File snapshotPropsFile = new File(versionDir, "snapshot.properties");
+        try (final InputStream in = new FileInputStream(snapshotPropsFile)) {
+            properties.load(in);
+        }
+
+        final String comments = properties.getProperty("comments");
+        final String flowName = properties.getProperty("name");
+
+        final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
+        snapshotMetadata.setBucketIdentifier(bucketId);
+        snapshotMetadata.setComments(comments);
+        snapshotMetadata.setFlowIdentifier(flowId);
+        snapshotMetadata.setFlowName(flowName);
+        snapshotMetadata.setTimestamp(System.currentTimeMillis());
+        snapshotMetadata.setVersion(version);
+
+        final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
+        snapshot.setFlowContents(processGroup);
+        snapshot.setSnapshotMetadata(snapshotMetadata);
+
+        return snapshot;
+    }
+
+    @Override
+    public VersionedFlow getVersionedFlow(final String bucketId, final String flowId) throws IOException, UnknownResourceException {
+        // Verify that the bucket exists
+        final File bucketDir = new File(directory, bucketId);
+        if (!bucketDir.exists()) {
+            throw new UnknownResourceException("No bucket exists with ID " + bucketId);
+        }
+
+        // Verify that the flow exists
+        final File flowDir = new File(bucketDir, flowId);
+        if (!flowDir.exists()) {
+            throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId);
+        }
+
+        final File flowPropsFile = new File(flowDir, "flow.properties");
+        final Properties flowProperties = new Properties();
+        try (final InputStream in = new FileInputStream(flowPropsFile)) {
+            flowProperties.load(in);
+        }
+
+        final VersionedFlow flow = new VersionedFlow();
+        flow.setBucketIdentifier(bucketId);
+        flow.setCreatedTimestamp(Long.parseLong(flowProperties.getProperty("created")));
+        flow.setDescription(flowProperties.getProperty("description"));
+        flow.setIdentifier(flowId);
+        flow.setModifiedTimestamp(flowDir.lastModified());
+        flow.setName(flowProperties.getProperty("name"));
+
+        final Comparator<VersionedFlowSnapshotMetadata> versionComparator = (a, b) -> Integer.compare(a.getVersion(), b.getVersion());
+
+        final SortedSet<VersionedFlowSnapshotMetadata> snapshotMetadataSet = new TreeSet<>(versionComparator);
+        flow.setSnapshotMetadata(snapshotMetadataSet);
+
+        final File[] versionDirs = flowDir.listFiles();
+        for (final File file : versionDirs) {
+            if (!file.isDirectory()) {
+                continue;
+            }
+
+            int version;
+            try {
+                version = Integer.parseInt(file.getName());
+            } catch (final NumberFormatException nfe) {
+                // not a version. skip.
+                continue;
+            }
+
+            final File snapshotPropsFile = new File(file, "snapshot.properties");
+            final Properties snapshotProperties = new Properties();
+            try (final InputStream in = new FileInputStream(snapshotPropsFile)) {
+                snapshotProperties.load(in);
+            }
+
+            final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata();
+            metadata.setBucketIdentifier(bucketId);
+            metadata.setComments(snapshotProperties.getProperty("comments"));
+            metadata.setFlowIdentifier(flowId);
+            metadata.setFlowName(snapshotProperties.getProperty("name"));
+            metadata.setTimestamp(file.lastModified());
+            metadata.setVersion(version);
+
+            snapshotMetadataSet.add(metadata);
+        }
+
+        return flow;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/StandardVersionControlInformation.java
new file mode 100644 (file)
index 0000000..41b98ed
--- /dev/null
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow;
+
+import java.util.Optional;
+
+public class StandardVersionControlInformation implements VersionControlInformation {
+
+    private final String registryIdentifier;
+    private final String bucketIdentifier;
+    private final String flowIdentifier;
+    private final int version;
+    private volatile VersionedProcessGroup flowSnapshot;
+    private volatile Boolean modified = null;
+    private volatile Boolean current = null;
+
+    public StandardVersionControlInformation(final String registryId, final String bucketId, final String flowId, final int version,
+        final VersionedProcessGroup snapshot, final Boolean modified, final Boolean current) {
+        this.registryIdentifier = registryId;
+        this.bucketIdentifier = bucketId;
+        this.flowIdentifier = flowId;
+        this.version = version;
+        this.flowSnapshot = snapshot;
+        this.modified = modified;
+        this.current = current;
+    }
+
+    @Override
+    public String getRegistryIdentifier() {
+        return registryIdentifier;
+    }
+
+    @Override
+    public String getBucketIdentifier() {
+        return bucketIdentifier;
+    }
+
+    @Override
+    public String getFlowIdentifier() {
+        return flowIdentifier;
+    }
+
+    @Override
+    public int getVersion() {
+        return version;
+    }
+
+    @Override
+    public Optional<Boolean> getModified() {
+        return Optional.ofNullable(modified);
+    }
+
+    @Override
+    public Optional<Boolean> getCurrent() {
+        return Optional.ofNullable(current);
+    }
+
+    @Override
+    public VersionedProcessGroup getFlowSnapshot() {
+        return flowSnapshot;
+    }
+
+    public void setModified(final boolean modified) {
+        this.modified = modified;
+    }
+
+    public void setCurrent(final boolean current) {
+        this.current = current;
+    }
+
+    public void setFlowSnapshot(final VersionedProcessGroup flowSnapshot) {
+        this.flowSnapshot = flowSnapshot;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedConnectableComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedConnectableComponent.java
new file mode 100644 (file)
index 0000000..26ad300
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow.mapping;
+
+import org.apache.nifi.registry.flow.ConnectableComponent;
+
+public class InstantiatedConnectableComponent extends ConnectableComponent implements InstantiatedVersionedComponent {
+    private final String instanceId;
+    private final String groupId;
+
+    public InstantiatedConnectableComponent(final String instanceId, final String instanceGroupId) {
+        this.instanceId = instanceId;
+        this.groupId = instanceGroupId;
+    }
+
+    @Override
+    public String getInstanceId() {
+        return instanceId;
+    }
+
+    @Override
+    public String getInstanceGroupId() {
+        return groupId;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedComponent.java
new file mode 100644 (file)
index 0000000..15c620a
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow.mapping;
+
+public interface InstantiatedVersionedComponent {
+    String getInstanceId();
+
+    String getInstanceGroupId();
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedConnection.java
new file mode 100644 (file)
index 0000000..d18733a
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow.mapping;
+
+import org.apache.nifi.registry.flow.VersionedConnection;
+
+public class InstantiatedVersionedConnection extends VersionedConnection implements InstantiatedVersionedComponent {
+    private final String instanceId;
+    private final String groupId;
+
+    public InstantiatedVersionedConnection(final String instanceId, final String instanceGroupId) {
+        this.instanceId = instanceId;
+        this.groupId = instanceGroupId;
+    }
+
+    @Override
+    public String getInstanceId() {
+        return instanceId;
+    }
+
+    @Override
+    public String getInstanceGroupId() {
+        return groupId;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedControllerService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedControllerService.java
new file mode 100644 (file)
index 0000000..0617cd5
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow.mapping;
+
+import org.apache.nifi.registry.flow.VersionedControllerService;
+
+public class InstantiatedVersionedControllerService extends VersionedControllerService implements InstantiatedVersionedComponent {
+    private final String instanceId;
+    private final String groupId;
+
+    public InstantiatedVersionedControllerService(final String instanceId, final String instanceGroupId) {
+        this.instanceId = instanceId;
+        this.groupId = instanceGroupId;
+    }
+
+    @Override
+    public String getInstanceId() {
+        return instanceId;
+    }
+
+    @Override
+    public String getInstanceGroupId() {
+        return groupId;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedFunnel.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedFunnel.java
new file mode 100644 (file)
index 0000000..6b1f230
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow.mapping;
+
+import org.apache.nifi.registry.flow.VersionedFunnel;
+
+public class InstantiatedVersionedFunnel extends VersionedFunnel implements InstantiatedVersionedComponent {
+    private final String instanceId;
+    private final String groupId;
+
+    public InstantiatedVersionedFunnel(final String instanceId, final String instanceGroupId) {
+        this.instanceId = instanceId;
+        this.groupId = instanceGroupId;
+    }
+
+    @Override
+    public String getInstanceId() {
+        return instanceId;
+    }
+
+    @Override
+    public String getInstanceGroupId() {
+        return groupId;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedLabel.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedLabel.java
new file mode 100644 (file)
index 0000000..1c061c8
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow.mapping;
+
+import org.apache.nifi.registry.flow.VersionedLabel;
+
+public class InstantiatedVersionedLabel extends VersionedLabel implements InstantiatedVersionedComponent {
+    private final String instanceId;
+    private final String groupId;
+
+    public InstantiatedVersionedLabel(final String instanceId, final String instanceGroupId) {
+        this.instanceId = instanceId;
+        this.groupId = instanceGroupId;
+    }
+
+    @Override
+    public String getInstanceId() {
+        return instanceId;
+    }
+
+    @Override
+    public String getInstanceGroupId() {
+        return groupId;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedPort.java
new file mode 100644 (file)
index 0000000..b3ddb40
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow.mapping;
+
+import org.apache.nifi.registry.flow.VersionedPort;
+
+public class InstantiatedVersionedPort extends VersionedPort implements InstantiatedVersionedComponent {
+    private final String instanceId;
+    private final String groupId;
+
+    public InstantiatedVersionedPort(final String instanceId, final String instanceGroupId) {
+        this.instanceId = instanceId;
+        this.groupId = instanceGroupId;
+    }
+
+    @Override
+    public String getInstanceId() {
+        return instanceId;
+    }
+
+    @Override
+    public String getInstanceGroupId() {
+        return groupId;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessGroup.java
new file mode 100644 (file)
index 0000000..a669220
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow.mapping;
+
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+
+public class InstantiatedVersionedProcessGroup extends VersionedProcessGroup implements InstantiatedVersionedComponent {
+    private final String instanceId;
+    private final String groupId;
+
+    public InstantiatedVersionedProcessGroup(final String instanceId, final String instanceGroupId) {
+        this.instanceId = instanceId;
+        this.groupId = instanceGroupId;
+    }
+
+    @Override
+    public String getInstanceId() {
+        return instanceId;
+    }
+
+    @Override
+    public String getInstanceGroupId() {
+        return groupId;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedProcessor.java
new file mode 100644 (file)
index 0000000..2763e9d
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow.mapping;
+
+import org.apache.nifi.registry.flow.VersionedProcessor;
+
+public class InstantiatedVersionedProcessor extends VersionedProcessor implements InstantiatedVersionedComponent {
+    private final String instanceId;
+    private final String groupId;
+
+    public InstantiatedVersionedProcessor(final String instanceId, final String instanceGroupId) {
+        this.instanceId = instanceId;
+        this.groupId = instanceGroupId;
+    }
+
+    @Override
+    public String getInstanceId() {
+        return instanceId;
+    }
+
+    @Override
+    public String getInstanceGroupId() {
+        return groupId;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedRemoteGroupPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedRemoteGroupPort.java
new file mode 100644 (file)
index 0000000..27805fa
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow.mapping;
+
+import org.apache.nifi.registry.flow.VersionedRemoteGroupPort;
+
+public class InstantiatedVersionedRemoteGroupPort extends VersionedRemoteGroupPort implements InstantiatedVersionedComponent {
+    private final String instanceId;
+    private final String remoteGroupId;
+
+    public InstantiatedVersionedRemoteGroupPort(final String instanceId, final String instanceRemoteGroupId) {
+        this.instanceId = instanceId;
+        this.remoteGroupId = instanceRemoteGroupId;
+    }
+
+    @Override
+    public String getInstanceId() {
+        return instanceId;
+    }
+
+    @Override
+    public String getInstanceGroupId() {
+        return remoteGroupId;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedRemoteProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/InstantiatedVersionedRemoteProcessGroup.java
new file mode 100644 (file)
index 0000000..57816ec
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow.mapping;
+
+import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup;
+
+public class InstantiatedVersionedRemoteProcessGroup extends VersionedRemoteProcessGroup implements InstantiatedVersionedComponent {
+    private final String instanceId;
+    private final String groupId;
+
+    public InstantiatedVersionedRemoteProcessGroup(final String instanceId, final String instanceGroupId) {
+        this.instanceId = instanceId;
+        this.groupId = instanceGroupId;
+    }
+
+    @Override
+    public String getInstanceId() {
+        return instanceId;
+    }
+
+    @Override
+    public String getInstanceGroupId() {
+        return groupId;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryDtoMapper.java
new file mode 100644 (file)
index 0000000..c3c1037
--- /dev/null
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow.mapping;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.registry.flow.BatchSize;
+import org.apache.nifi.registry.flow.Bundle;
+import org.apache.nifi.registry.flow.ComponentType;
+import org.apache.nifi.registry.flow.ConnectableComponent;
+import org.apache.nifi.registry.flow.ConnectableComponentType;
+import org.apache.nifi.registry.flow.ControllerServiceAPI;
+import org.apache.nifi.registry.flow.PortType;
+import org.apache.nifi.registry.flow.Position;
+import org.apache.nifi.registry.flow.VersionedConnection;
+import org.apache.nifi.registry.flow.VersionedControllerService;
+import org.apache.nifi.registry.flow.VersionedFunnel;
+import org.apache.nifi.registry.flow.VersionedLabel;
+import org.apache.nifi.registry.flow.VersionedPort;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.flow.VersionedProcessor;
+import org.apache.nifi.registry.flow.VersionedRemoteGroupPort;
+import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup;
+import org.apache.nifi.web.api.dto.BatchSettingsDTO;
+import org.apache.nifi.web.api.dto.BundleDTO;
+import org.apache.nifi.web.api.dto.ConnectableDTO;
+import org.apache.nifi.web.api.dto.ConnectionDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceApiDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
+import org.apache.nifi.web.api.dto.FlowSnippetDTO;
+import org.apache.nifi.web.api.dto.FunnelDTO;
+import org.apache.nifi.web.api.dto.LabelDTO;
+import org.apache.nifi.web.api.dto.PortDTO;
+import org.apache.nifi.web.api.dto.PositionDTO;
+import org.apache.nifi.web.api.dto.ProcessGroupDTO;
+import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
+import org.apache.nifi.web.api.dto.ProcessorDTO;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
+
+
+public class NiFiRegistryDtoMapper {
+    // We need to keep a mapping of component id to versionedComponentId as we transform these objects. This way, when
+    // we call #mapConnectable, instead of generating a new UUID for the ConnectableComponent, we can lookup the 'versioned'
+    // identifier based on the comopnent's actual id. We do connections last, so that all components will already have been
+    // created before attempting to create the connection, where the ConnectableDTO is converted.
+    private Map<String, String> versionedComponentIds = new HashMap<>();
+
+    public VersionedProcessGroup mapProcessGroup(final ProcessGroupDTO dto) {
+        versionedComponentIds.clear();
+        return mapGroup(dto);
+    }
+
+    private VersionedProcessGroup mapGroup(final ProcessGroupDTO dto) {
+        final VersionedProcessGroup versionedGroup = new VersionedProcessGroup();
+        versionedGroup.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
+        versionedGroup.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
+        versionedGroup.setName(dto.getName());
+        versionedGroup.setComments(dto.getComments());
+        versionedGroup.setPosition(mapPosition(dto.getPosition()));
+
+        final FlowSnippetDTO contents = dto.getContents();
+
+        versionedGroup.setControllerServices(contents.getControllerServices().stream()
+            .map(this::mapControllerService)
+            .collect(Collectors.toCollection(LinkedHashSet::new)));
+
+        versionedGroup.setFunnels(contents.getFunnels().stream()
+            .map(this::mapFunnel)
+            .collect(Collectors.toCollection(LinkedHashSet::new)));
+
+        versionedGroup.setInputPorts(contents.getInputPorts().stream()
+            .map(this::mapPort)
+            .collect(Collectors.toCollection(LinkedHashSet::new)));
+
+        versionedGroup.setOutputPorts(contents.getOutputPorts().stream()
+            .map(this::mapPort)
+            .collect(Collectors.toCollection(LinkedHashSet::new)));
+
+        versionedGroup.setLabels(contents.getLabels().stream()
+            .map(this::mapLabel)
+            .collect(Collectors.toCollection(LinkedHashSet::new)));
+
+        versionedGroup.setProcessors(contents.getProcessors().stream()
+            .map(this::mapProcessor)
+            .collect(Collectors.toCollection(LinkedHashSet::new)));
+
+        versionedGroup.setRemoteProcessGroups(contents.getRemoteProcessGroups().stream()
+            .map(this::mapRemoteProcessGroup)
+            .collect(Collectors.toCollection(LinkedHashSet::new)));
+
+        versionedGroup.setProcessGroups(contents.getProcessGroups().stream()
+            .map(this::mapGroup)
+            .collect(Collectors.toCollection(LinkedHashSet::new)));
+
+        versionedGroup.setConnections(contents.getConnections().stream()
+            .map(this::mapConnection)
+            .collect(Collectors.toCollection(LinkedHashSet::new)));
+
+        return versionedGroup;
+    }
+
+    private String getId(final String currentVersionedId, final String componentId) {
+        final String versionedId;
+        if (currentVersionedId == null) {
+            versionedId = UUID.nameUUIDFromBytes(componentId.getBytes(StandardCharsets.UTF_8)).toString();
+        } else {
+            versionedId = currentVersionedId;
+        }
+
+        versionedComponentIds.put(componentId, versionedId);
+        return versionedId;
+    }
+
+    private String getGroupId(final String groupId) {
+        return versionedComponentIds.get(groupId);
+    }
+
+    public VersionedConnection mapConnection(final ConnectionDTO dto) {
+        final VersionedConnection connection = new VersionedConnection();
+        connection.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
+        connection.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
+        connection.setName(dto.getName());
+        connection.setBackPressureDataSizeThreshold(dto.getBackPressureDataSizeThreshold());
+        connection.setBackPressureObjectThreshold(dto.getBackPressureObjectThreshold());
+        connection.setFlowFileExpiration(dto.getFlowFileExpiration());
+        connection.setLabelIndex(dto.getLabelIndex());
+        connection.setPosition(mapPosition(dto.getPosition()));
+        connection.setPrioritizers(dto.getPrioritizers());
+        connection.setSelectedRelationships(dto.getSelectedRelationships());
+        connection.setzIndex(dto.getzIndex());
+
+        connection.setBends(dto.getBends().stream()
+            .map(this::mapPosition)
+            .collect(Collectors.toList()));
+
+        connection.setSource(mapConnectable(dto.getSource()));
+        connection.setDestination(mapConnectable(dto.getDestination()));
+
+        return connection;
+    }
+
+    public ConnectableComponent mapConnectable(final ConnectableDTO dto) {
+        final ConnectableComponent component = new ConnectableComponent();
+
+        final String versionedId = dto.getVersionedComponentId();
+        if (versionedId == null) {
+            final String resolved = versionedComponentIds.get(dto.getId());
+            if (resolved == null) {
+                throw new IllegalArgumentException("Unable to map Connectable Component with identifier " + dto.getId() + " to any version-controlled component");
+            }
+
+            component.setId(resolved);
+        } else {
+            component.setId(versionedId);
+        }
+
+        component.setComments(dto.getComments());
+        component.setGroupId(dto.getGroupId());
+        component.setName(dto.getName());
+        component.setType(ConnectableComponentType.valueOf(dto.getType()));
+        return component;
+    }
+
+    public VersionedControllerService mapControllerService(final ControllerServiceDTO dto) {
+        final VersionedControllerService service = new VersionedControllerService();
+        service.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
+        service.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
+        service.setName(dto.getName());
+        service.setAnnotationData(dto.getAnnotationData());
+        service.setBundle(mapBundle(dto.getBundle()));
+        service.setComments(dto.getComments());
+        service.setControllerServiceApis(dto.getControllerServiceApis().stream()
+            .map(this::mapControllerServiceApi)
+            .collect(Collectors.toList()));
+        service.setProperties(dto.getProperties());
+        service.setType(dto.getType());
+        return null;
+    }
+
+    private Bundle mapBundle(final BundleDTO dto) {
+        final Bundle bundle = new Bundle();
+        bundle.setGroup(dto.getGroup());
+        bundle.setArtifact(dto.getArtifact());
+        bundle.setVersion(dto.getVersion());
+        return bundle;
+    }
+
+    private ControllerServiceAPI mapControllerServiceApi(final ControllerServiceApiDTO dto) {
+        final ControllerServiceAPI api = new ControllerServiceAPI();
+        api.setBundle(mapBundle(dto.getBundle()));
+        api.setType(dto.getType());
+        return api;
+    }
+
+    public VersionedFunnel mapFunnel(final FunnelDTO dto) {
+        final VersionedFunnel funnel = new VersionedFunnel();
+        funnel.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
+        funnel.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
+        funnel.setPosition(mapPosition(dto.getPosition()));
+        return funnel;
+    }
+
+    public VersionedLabel mapLabel(final LabelDTO dto) {
+        final VersionedLabel label = new VersionedLabel();
+        label.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
+        label.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
+        label.setHeight(dto.getHeight());
+        label.setWidth(dto.getWidth());
+        label.setLabel(dto.getLabel());
+        label.setPosition(mapPosition(dto.getPosition()));
+        label.setStyle(dto.getStyle());
+        return label;
+    }
+
+    public VersionedPort mapPort(final PortDTO dto) {
+        final VersionedPort port = new VersionedPort();
+        port.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
+        port.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
+        port.setComments(dto.getComments());
+        port.setConcurrentlySchedulableTaskCount(dto.getConcurrentlySchedulableTaskCount());
+        port.setName(dto.getName());
+        port.setPosition(mapPosition(dto.getPosition()));
+        port.setType(PortType.valueOf(dto.getType()));
+        return port;
+    }
+
+    public Position mapPosition(final PositionDTO dto) {
+        final Position position = new Position();
+        position.setX(dto.getX());
+        position.setY(dto.getY());
+        return position;
+    }
+
+    public VersionedProcessor mapProcessor(final ProcessorDTO dto) {
+        final ProcessorConfigDTO config = dto.getConfig();
+
+        final VersionedProcessor processor = new VersionedProcessor();
+        processor.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
+        processor.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
+        processor.setType(dto.getType());
+        processor.setAnnotationData(config.getAnnotationData());
+        processor.setAutoTerminatedRelationships(config.getAutoTerminatedRelationships());
+        processor.setBulletinLevel(config.getBulletinLevel());
+        processor.setBundle(mapBundle(dto.getBundle()));
+        processor.setComments(config.getComments());
+        processor.setConcurrentlySchedulableTaskCount(config.getConcurrentlySchedulableTaskCount());
+        processor.setExecutionNode(config.getExecutionNode());
+        processor.setName(dto.getName());
+        processor.setPenaltyDuration(config.getPenaltyDuration());
+        processor.setPosition(mapPosition(dto.getPosition()));
+        processor.setProperties(config.getProperties());
+        processor.setRunDurationMillis(config.getRunDurationMillis());
+        processor.setSchedulingPeriod(config.getSchedulingPeriod());
+        processor.setSchedulingStrategy(config.getSchedulingStrategy());
+        processor.setStyle(dto.getStyle());
+        processor.setYieldDuration(config.getYieldDuration());
+        return processor;
+    }
+
+    public VersionedRemoteProcessGroup mapRemoteProcessGroup(final RemoteProcessGroupDTO dto) {
+        final VersionedRemoteProcessGroup rpg = new VersionedRemoteProcessGroup();
+        rpg.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
+        rpg.setGroupIdentifier(getGroupId(dto.getParentGroupId()));
+        rpg.setComments(dto.getComments());
+        rpg.setCommunicationsTimeout(dto.getCommunicationsTimeout());
+        rpg.setLocalNetworkInterface(dto.getLocalNetworkInterface());
+        rpg.setName(dto.getName());
+        rpg.setInputPorts(dto.getContents().getInputPorts().stream()
+            .map(port -> mapRemotePort(port, ComponentType.REMOTE_INPUT_PORT))
+            .collect(Collectors.toSet()));
+        rpg.setOutputPorts(dto.getContents().getOutputPorts().stream()
+            .map(port -> mapRemotePort(port, ComponentType.REMOTE_OUTPUT_PORT))
+            .collect(Collectors.toSet()));
+        rpg.setPosition(mapPosition(dto.getPosition()));
+        rpg.setProxyHost(dto.getProxyHost());
+        rpg.setProxyPort(dto.getProxyPort());
+        rpg.setProxyUser(dto.getProxyUser());
+        rpg.setTargetUri(dto.getTargetUri());
+        rpg.setTargetUris(dto.getTargetUris());
+        rpg.setTransportProtocol(dto.getTransportProtocol());
+        rpg.setYieldDuration(dto.getYieldDuration());
+        return rpg;
+    }
+
+    public VersionedRemoteGroupPort mapRemotePort(final RemoteProcessGroupPortDTO dto, final ComponentType componentType) {
+        final VersionedRemoteGroupPort port = new VersionedRemoteGroupPort();
+        port.setIdentifier(getId(dto.getVersionedComponentId(), dto.getId()));
+        port.setGroupIdentifier(getGroupId(dto.getGroupId()));
+        port.setComments(dto.getComments());
+        port.setConcurrentlySchedulableTaskCount(dto.getConcurrentlySchedulableTaskCount());
+        port.setGroupId(dto.getGroupId());
+        port.setName(dto.getName());
+        port.setUseCompression(dto.getUseCompression());
+        port.setBatchSettings(mapBatchSettings(dto.getBatchSettings()));
+        port.setComponentType(componentType);
+        return port;
+    }
+
+    private BatchSize mapBatchSettings(final BatchSettingsDTO dto) {
+        final BatchSize batchSize = new BatchSize();
+        batchSize.setCount(dto.getCount());
+        batchSize.setDuration(dto.getDuration());
+        batchSize.setSize(dto.getSize());
+        return batchSize;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
new file mode 100644 (file)
index 0000000..e3edc30
--- /dev/null
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow.mapping;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.ConfiguredComponent;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.registry.flow.BatchSize;
+import org.apache.nifi.registry.flow.Bundle;
+import org.apache.nifi.registry.flow.ComponentType;
+import org.apache.nifi.registry.flow.ConnectableComponent;
+import org.apache.nifi.registry.flow.ConnectableComponentType;
+import org.apache.nifi.registry.flow.ControllerServiceAPI;
+import org.apache.nifi.registry.flow.FlowRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.PortType;
+import org.apache.nifi.registry.flow.Position;
+import org.apache.nifi.registry.flow.RemoteFlowCoordinates;
+import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.VersionedConnection;
+import org.apache.nifi.registry.flow.VersionedControllerService;
+import org.apache.nifi.registry.flow.VersionedFunnel;
+import org.apache.nifi.registry.flow.VersionedLabel;
+import org.apache.nifi.registry.flow.VersionedPort;
+import org.apache.nifi.registry.flow.VersionedProcessor;
+import org.apache.nifi.registry.flow.VersionedRemoteGroupPort;
+import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup;
+import org.apache.nifi.remote.RemoteGroupPort;
+
+
+public class NiFiRegistryFlowMapper {
+    // We need to keep a mapping of component id to versionedComponentId as we transform these objects. This way, when
+    // we call #mapConnectable, instead of generating a new UUID for the ConnectableComponent, we can lookup the 'versioned'
+    // identifier based on the comopnent's actual id. We do connections last, so that all components will already have been
+    // created before attempting to create the connection, where the ConnectableDTO is converted.
+    private Map<String, String> versionedComponentIds = new HashMap<>();
+
+    public InstantiatedVersionedProcessGroup mapProcessGroup(final ProcessGroup group, final FlowRegistryClient registryClient) {
+        versionedComponentIds.clear();
+        return mapGroup(group, registryClient, true);
+    }
+
+    private InstantiatedVersionedProcessGroup mapGroup(final ProcessGroup group, final FlowRegistryClient registryClient, final boolean topLevel) {
+        final InstantiatedVersionedProcessGroup versionedGroup = new InstantiatedVersionedProcessGroup(group.getIdentifier(), group.getProcessGroupIdentifier());
+        versionedGroup.setIdentifier(getId(group.getVersionedComponentId(), group.getIdentifier()));
+        versionedGroup.setGroupIdentifier(getGroupId(group.getProcessGroupIdentifier()));
+        versionedGroup.setName(group.getName());
+        versionedGroup.setComments(group.getComments());
+        versionedGroup.setPosition(mapPosition(group.getPosition()));
+
+        // If we are at the 'top level', meaning that the given Process Group is the group that we are creating a VersionedProcessGroup for,
+        // then we don't want to include the RemoteFlowCoordinates; we want to include the group contents. The RemoteFlowCoordinates will be used
+        // only for a child group that is itself version controlled.
+        if (!topLevel) {
+            final VersionControlInformation versionControlInfo = group.getVersionControlInformation();
+            if (versionControlInfo != null) {
+                final RemoteFlowCoordinates coordinates = new RemoteFlowCoordinates();
+                final String registryId = versionControlInfo.getRegistryIdentifier();
+                final FlowRegistry registry = registryClient.getFlowRegistry(registryId);
+                if (registry == null) {
+                    throw new IllegalStateException("Process Group refers to a Flow Registry with ID " + registryId + " but no Flow Registry exists with that ID. Cannot resolve to a URL.");
+                }
+
+                coordinates.setRegistryUrl(registry.getURL());
+                coordinates.setBucketId(versionControlInfo.getBucketIdentifier());
+                coordinates.setFlowId(versionControlInfo.getFlowIdentifier());
+                coordinates.setVersion(versionControlInfo.getVersion());
+
+                // If the Process Group itself is remotely versioned, then we don't want to include its contents
+                // because the contents are remotely managed and not part of the versioning of this Process Group
+                return versionedGroup;
+            }
+        }
+
+        versionedGroup.setControllerServices(group.getControllerServices(false).stream()
+            .map(this::mapControllerService)
+            .collect(Collectors.toCollection(LinkedHashSet::new)));
+
+        versionedGroup.setFunnels(group.getFunnels().stream()
+            .map(this::mapFunnel)
+            .collect(Collectors.toCollection(LinkedHashSet::new)));
+
+        versionedGroup.setInputPorts(group.getInputPorts().stream()
+            .map(this::mapPort)
+            .collect(Collectors.toCollection(LinkedHashSet::new)));
+
+        versionedGroup.setOutputPorts(group.getOutputPorts().stream()
+            .map(this::mapPort)
+            .collect(Collectors.toCollection(LinkedHashSet::new)));
+
+        versionedGroup.setLabels(group.getLabels().stream()
+            .map(this::mapLabel)
+            .collect(Collectors.toCollection(LinkedHashSet::new)));
+
+        versionedGroup.setProcessors(group.getProcessors().stream()
+            .map(this::mapProcessor)
+            .collect(Collectors.toCollection(LinkedHashSet::new)));
+
+        versionedGroup.setRemoteProcessGroups(group.getRemoteProcessGroups().stream()
+            .map(this::mapRemoteProcessGroup)
+            .collect(Collectors.toCollection(LinkedHashSet::new)));
+
+        versionedGroup.setProcessGroups(group.getProcessGroups().stream()
+            .map(grp -> mapGroup(grp, registryClient, false))
+            .collect(Collectors.toCollection(LinkedHashSet::new)));
+
+        versionedGroup.setConnections(group.getConnections().stream()
+            .map(this::mapConnection)
+            .collect(Collectors.toCollection(LinkedHashSet::new)));
+
+        versionedGroup.setVariables(group.getVariableRegistry().getVariableMap().entrySet().stream()
+            .collect(Collectors.toMap(entry -> entry.getKey().getName(), Map.Entry::getValue)));
+
+        return versionedGroup;
+    }
+
+    private String getId(final Optional<String> currentVersionedId, final String componentId) {
+        final String versionedId;
+        if (currentVersionedId.isPresent()) {
+            versionedId = currentVersionedId.get();
+        } else {
+            versionedId = UUID.nameUUIDFromBytes(componentId.getBytes(StandardCharsets.UTF_8)).toString();
+        }
+
+        versionedComponentIds.put(componentId, versionedId);
+        return versionedId;
+    }
+
+    private String getGroupId(final String groupId) {
+        return versionedComponentIds.get(groupId);
+    }
+
+    public VersionedConnection mapConnection(final Connection connection) {
+        final FlowFileQueue queue = connection.getFlowFileQueue();
+
+        final VersionedConnection versionedConnection = new InstantiatedVersionedConnection(connection.getIdentifier(), connection.getProcessGroup().getIdentifier());
+        versionedConnection.setIdentifier(getId(connection.getVersionedComponentId(), connection.getIdentifier()));
+        versionedConnection.setGroupIdentifier(getGroupId(connection.getProcessGroup().getIdentifier()));
+        versionedConnection.setName(connection.getName());
+        versionedConnection.setBackPressureDataSizeThreshold(queue.getBackPressureDataSizeThreshold());
+        versionedConnection.setBackPressureObjectThreshold(queue.getBackPressureObjectThreshold());
+        versionedConnection.setFlowFileExpiration(queue.getFlowFileExpiration());
+        versionedConnection.setLabelIndex(connection.getLabelIndex());
+        versionedConnection.setPrioritizers(queue.getPriorities().stream().map(p -> p.getClass().getName()).collect(Collectors.toList()));
+        versionedConnection.setSelectedRelationships(connection.getRelationships().stream().map(Relationship::getName).collect(Collectors.toSet()));
+        versionedConnection.setzIndex(connection.getZIndex());
+
+        versionedConnection.setBends(connection.getBendPoints().stream()
+            .map(this::mapPosition)
+            .collect(Collectors.toList()));
+
+        versionedConnection.setSource(mapConnectable(connection.getSource()));
+        versionedConnection.setDestination(mapConnectable(connection.getDestination()));
+
+        return versionedConnection;
+    }
+
+    public ConnectableComponent mapConnectable(final Connectable connectable) {
+        final ConnectableComponent component = new InstantiatedConnectableComponent(connectable.getIdentifier(), connectable.getProcessGroupIdentifier());
+
+        final Optional<String> versionedId = connectable.getVersionedComponentId();
+        if (versionedId.isPresent()) {
+            component.setId(versionedId.get());
+        } else {
+            final String resolved = versionedComponentIds.get(connectable.getIdentifier());
+            if (resolved == null) {
+                throw new IllegalArgumentException("Unable to map Connectable Component with identifier " + connectable.getIdentifier() + " to any version-controlled component");
+            }
+
+            component.setId(resolved);
+        }
+
+        component.setComments(connectable.getComments());
+        component.setGroupId(connectable.getProcessGroupIdentifier());
+        component.setName(connectable.getName());
+        component.setType(ConnectableComponentType.valueOf(connectable.getConnectableType().name()));
+        return component;
+    }
+
+    public VersionedControllerService mapControllerService(final ControllerServiceNode controllerService) {
+        final VersionedControllerService versionedService = new InstantiatedVersionedControllerService(controllerService.getIdentifier(), controllerService.getProcessGroupIdentifier());
+        versionedService.setIdentifier(getId(controllerService.getVersionedComponentId(), controllerService.getIdentifier()));
+        versionedService.setGroupIdentifier(getGroupId(controllerService.getProcessGroupIdentifier()));
+        versionedService.setName(controllerService.getName());
+        versionedService.setAnnotationData(controllerService.getAnnotationData());
+        versionedService.setBundle(mapBundle(controllerService.getBundleCoordinate()));
+        versionedService.setComments(controllerService.getComments());
+
+        versionedService.setControllerServiceApis(mapControllerServiceApis(controllerService));
+        versionedService.setProperties(mapProperties(controllerService));
+        versionedService.setType(controllerService.getCanonicalClassName());
+
+        return versionedService;
+    }
+
+    private Map<String, String> mapProperties(final ConfiguredComponent component) {
+        final Map<String, String> mapped = new HashMap<>();
+        component.getProperties().keySet().stream()
+            .forEach(property -> {
+                String value = component.getProperty(property);
+                if (value == null) {
+                    value = property.getDefaultValue();
+                }
+                mapped.put(property.getName(), value);
+            });
+        return mapped;
+    }
+
+    private Bundle mapBundle(final BundleCoordinate coordinate) {
+        final Bundle versionedBundle = new Bundle();
+        versionedBundle.setGroup(coordinate.getGroup());
+        versionedBundle.setArtifact(coordinate.getId());
+        versionedBundle.setVersion(coordinate.getVersion());
+        return versionedBundle;
+    }
+
+    private List<ControllerServiceAPI> mapControllerServiceApis(final ControllerServiceNode service) {
+        final Class<?> serviceClass = service.getControllerServiceImplementation().getClass();
+
+        final Set<Class<?>> serviceApiClasses = new HashSet<>();
+        // get all of it's interfaces to determine the controller service api's it implements
+        final List<Class<?>> interfaces = ClassUtils.getAllInterfaces(serviceClass);
+        for (final Class<?> i : interfaces) {
+            // add all controller services that's not ControllerService itself
+            if (ControllerService.class.isAssignableFrom(i) && !ControllerService.class.equals(i)) {
+                serviceApiClasses.add(i);
+            }
+        }
+
+
+        final List<ControllerServiceAPI> serviceApis = new ArrayList<>();
+        for (final Class<?> serviceApiClass : serviceApiClasses) {
+            final BundleCoordinate bundleCoordinate = ExtensionManager.getBundle(serviceApiClass.getClassLoader()).getBundleDetails().getCoordinate();
+
+            final ControllerServiceAPI serviceApi = new ControllerServiceAPI();
+            serviceApi.setType(serviceApiClass.getName());
+            serviceApi.setBundle(mapBundle(bundleCoordinate));
+            serviceApis.add(serviceApi);
+        }
+        return serviceApis;
+    }
+
+
+    public VersionedFunnel mapFunnel(final Funnel funnel) {
+        final VersionedFunnel versionedFunnel = new InstantiatedVersionedFunnel(funnel.getIdentifier(), funnel.getProcessGroupIdentifier());
+        versionedFunnel.setIdentifier(getId(funnel.getVersionedComponentId(), funnel.getIdentifier()));
+        versionedFunnel.setGroupIdentifier(getGroupId(funnel.getProcessGroupIdentifier()));
+        versionedFunnel.setPosition(mapPosition(funnel.getPosition()));
+
+        return versionedFunnel;
+    }
+
+    public VersionedLabel mapLabel(final Label label) {
+        final VersionedLabel versionedLabel = new InstantiatedVersionedLabel(label.getIdentifier(), label.getProcessGroupIdentifier());
+        versionedLabel.setIdentifier(getId(label.getVersionedComponentId(), label.getIdentifier()));
+        versionedLabel.setGroupIdentifier(getGroupId(label.getProcessGroupIdentifier()));
+        versionedLabel.setHeight(label.getSize().getHeight());
+        versionedLabel.setWidth(label.getSize().getWidth());
+        versionedLabel.setLabel(label.getValue());
+        versionedLabel.setPosition(mapPosition(label.getPosition()));
+        versionedLabel.setStyle(label.getStyle());
+
+        return versionedLabel;
+    }
+
+    public VersionedPort mapPort(final Port port) {
+        final VersionedPort versionedPort = new InstantiatedVersionedPort(port.getIdentifier(), port.getProcessGroupIdentifier());
+        versionedPort.setIdentifier(getId(port.getVersionedComponentId(), port.getIdentifier()));
+        versionedPort.setGroupIdentifier(getGroupId(port.getProcessGroupIdentifier()));
+        versionedPort.setComments(port.getComments());
+        versionedPort.setConcurrentlySchedulableTaskCount(port.getMaxConcurrentTasks());
+        versionedPort.setName(port.getName());
+        versionedPort.setPosition(mapPosition(port.getPosition()));
+        versionedPort.setType(PortType.valueOf(port.getComponentType()));
+        return versionedPort;
+    }
+
+    public Position mapPosition(final org.apache.nifi.connectable.Position pos) {
+        final Position position = new Position();
+        position.setX(pos.getX());
+        position.setY(pos.getY());
+        return position;
+    }
+
+    public VersionedProcessor mapProcessor(final ProcessorNode procNode) {
+        final VersionedProcessor processor = new InstantiatedVersionedProcessor(procNode.getIdentifier(), procNode.getProcessGroupIdentifier());
+        processor.setIdentifier(getId(procNode.getVersionedComponentId(), procNode.getIdentifier()));
+        processor.setGroupIdentifier(getGroupId(procNode.getProcessGroupIdentifier()));
+        processor.setType(procNode.getCanonicalClassName());
+        processor.setAnnotationData(procNode.getAnnotationData());
+        processor.setAutoTerminatedRelationships(procNode.getAutoTerminatedRelationships().stream().map(Relationship::getName).collect(Collectors.toSet()));
+        processor.setBulletinLevel(procNode.getBulletinLevel().name());
+        processor.setBundle(mapBundle(procNode.getBundleCoordinate()));
+        processor.setComments(procNode.getComments());
+        processor.setConcurrentlySchedulableTaskCount(procNode.getMaxConcurrentTasks());
+        processor.setExecutionNode(procNode.getExecutionNode().name());
+        processor.setName(procNode.getName());
+        processor.setPenaltyDuration(procNode.getPenalizationPeriod());
+        processor.setPosition(mapPosition(procNode.getPosition()));
+        processor.setProperties(mapProperties(procNode));
+        processor.setRunDurationMillis(procNode.getRunDuration(TimeUnit.MILLISECONDS));
+        processor.setSchedulingPeriod(procNode.getSchedulingPeriod());
+        processor.setSchedulingStrategy(procNode.getSchedulingStrategy().name());
+        processor.setStyle(procNode.getStyle());
+        processor.setYieldDuration(procNode.getYieldPeriod());
+
+        return processor;
+    }
+
+    public VersionedRemoteProcessGroup mapRemoteProcessGroup(final RemoteProcessGroup remoteGroup) {
+        final VersionedRemoteProcessGroup rpg = new InstantiatedVersionedRemoteProcessGroup(remoteGroup.getIdentifier(), remoteGroup.getProcessGroupIdentifier());
+        rpg.setIdentifier(getId(remoteGroup.getVersionedComponentId(), remoteGroup.getIdentifier()));
+        rpg.setGroupIdentifier(getGroupId(remoteGroup.getProcessGroupIdentifier()));
+        rpg.setComments(remoteGroup.getComments());
+        rpg.setCommunicationsTimeout(remoteGroup.getCommunicationsTimeout());
+        rpg.setLocalNetworkInterface(remoteGroup.getNetworkInterface());
+        rpg.setName(remoteGroup.getName());
+        rpg.setInputPorts(remoteGroup.getInputPorts().stream()
+            .map(port -> mapRemotePort(port, ComponentType.REMOTE_INPUT_PORT))
+            .collect(Collectors.toSet()));
+        rpg.setOutputPorts(remoteGroup.getOutputPorts().stream()
+            .map(port -> mapRemotePort(port, ComponentType.REMOTE_OUTPUT_PORT))
+            .collect(Collectors.toSet()));
+        rpg.setPosition(mapPosition(remoteGroup.getPosition()));
+        rpg.setProxyHost(remoteGroup.getProxyHost());
+        rpg.setProxyPort(remoteGroup.getProxyPort());
+        rpg.setProxyUser(remoteGroup.getProxyUser());
+        rpg.setTargetUri(remoteGroup.getTargetUri());
+        rpg.setTargetUris(remoteGroup.getTargetUris());
+        rpg.setTransportProtocol(remoteGroup.getTransportProtocol().name());
+        rpg.setYieldDuration(remoteGroup.getYieldDuration());
+        return rpg;
+    }
+
+    public VersionedRemoteGroupPort mapRemotePort(final RemoteGroupPort remotePort, final ComponentType componentType) {
+        final VersionedRemoteGroupPort port = new InstantiatedVersionedRemoteGroupPort(remotePort.getIdentifier(), remotePort.getRemoteProcessGroup().getIdentifier());
+        port.setIdentifier(getId(remotePort.getVersionedComponentId(), remotePort.getIdentifier()));
+        port.setGroupIdentifier(getGroupId(remotePort.getRemoteProcessGroup().getIdentifier()));
+        port.setComments(remotePort.getComments());
+        port.setConcurrentlySchedulableTaskCount(remotePort.getMaxConcurrentTasks());
+        port.setGroupId(remotePort.getProcessGroupIdentifier());
+        port.setName(remotePort.getName());
+        port.setUseCompression(remotePort.isUseCompression());
+        port.setBatchSettings(mapBatchSettings(remotePort));
+        port.setComponentType(componentType);
+        return port;
+    }
+
+    private BatchSize mapBatchSettings(final RemoteGroupPort remotePort) {
+        final BatchSize batchSize = new BatchSize();
+        batchSize.setCount(remotePort.getBatchCount());
+        batchSize.setDuration(remotePort.getBatchDuration());
+        batchSize.setSize(remotePort.getBatchSize());
+        return batchSize;
+    }
+}
index 53d5c9f..6b55735 100644 (file)
@@ -34,6 +34,7 @@ import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ScheduledExecutorService;
@@ -109,6 +110,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
     private final AtomicReference<String> comments = new AtomicReference<>();
     private final AtomicReference<ProcessGroup> processGroup;
     private final AtomicBoolean transmitting = new AtomicBoolean(false);
+    private final AtomicReference<String> versionedComponentId = new AtomicReference<>();
     private final SSLContext sslContext;
 
     private volatile String communicationsTimeout = "30 sec";
@@ -1419,4 +1421,26 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
         return new File(stateDir, getIdentifier() + ".peers");
     }
 
+    @Override
+    public Optional<String> getVersionedComponentId() {
+        return Optional.ofNullable(versionedComponentId.get());
+    }
+
+    @Override
+    public void setVersionedComponentId(final String versionedComponentId) {
+        boolean updated = false;
+        while (!updated) {
+            final String currentId = this.versionedComponentId.get();
+
+            if (currentId == null) {
+                updated = this.versionedComponentId.compareAndSet(null, versionedComponentId);
+            } else if (currentId.equals(versionedComponentId)) {
+                return;
+            } else if (versionedComponentId == null) {
+                updated = this.versionedComponentId.compareAndSet(currentId, null);
+            } else {
+                throw new IllegalStateException(this + " is already under version control");
+            }
+        }
+    }
 }
index 7dbcec1..7d657ce 100644 (file)
@@ -22,6 +22,7 @@ public class StandardRemoteProcessGroupPortDescriptor implements RemoteProcessGr
 
     private String id;
     private String targetId;
+    private String versionedComponentId;
     private String groupId;
     private String name;
     private String comments;
@@ -185,4 +186,13 @@ public class StandardRemoteProcessGroupPortDescriptor implements RemoteProcessGr
         }
         return name.equals(other.getName());
     }
+
+    @Override
+    public String getVersionedComponentId() {
+        return versionedComponentId;
+    }
+
+    public void setVersionedComponentId(String versionedId) {
+        this.versionedComponentId = versionedId;
+    }
 }
index 7ed9187..c7a7e7d 100644 (file)
@@ -26,6 +26,7 @@ import org.apache.nifi.controller.leader.election.LeaderElectionManager;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
 import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.util.NiFiProperties;
 import org.springframework.beans.BeansException;
@@ -49,6 +50,7 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
     private ClusterCoordinator clusterCoordinator;
     private VariableRegistry variableRegistry;
     private LeaderElectionManager leaderElectionManager;
+    private FlowRegistryClient flowRegistryClient;
 
     @Override
     public Object getObject() throws Exception {
@@ -69,7 +71,8 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
                     clusterCoordinator,
                     heartbeatMonitor,
                     leaderElectionManager,
-                    variableRegistry);
+                    variableRegistry,
+                    flowRegistryClient);
             } else {
                 flowController = FlowController.createStandaloneInstance(
                     flowFileEventRepository,
@@ -77,7 +80,9 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
                     authorizer,
                     auditService,
                     encryptor,
-                    bulletinRepository, variableRegistry);
+                    bulletinRepository,
+                    variableRegistry,
+                    flowRegistryClient);
             }
 
         }
@@ -133,4 +138,8 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
     public void setLeaderElectionManager(final LeaderElectionManager leaderElectionManager) {
         this.leaderElectionManager = leaderElectionManager;
     }
+
+    public void setFlowRegistryClient(final FlowRegistryClient flowRegistryClient) {
+        this.flowRegistryClient = flowRegistryClient;
+    }
 }
diff --git