NIFI-4975 Add GridFS processors
authorMike Thomsen <mikerthomsen@gmail.com>
Sat, 10 Mar 2018 17:57:28 +0000 (12:57 -0500)
committerMatthew Burgess <mattyb149@apache.org>
Wed, 13 Feb 2019 21:51:17 +0000 (16:51 -0500)
NIFI-4975 Added changes requested in a code review.
NIFI-4975 Reverted some base Mongo changes.
NIFI-4975 Moved connection configuration to using Mongo client service.
NIFI-4975 Fixed a lot of style issues.
NIFI-4975 Removed an EL statement that was causing problems with the UI.
NIFI-4975 Added changes from code review.
NIFI-4975 Added additional details for FetchGridFS.
NIFI-4975 Added documentation for DeleteGridFS.
NIFI-4975 Added documentation for PutGridFS.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>
This closes #2546

15 files changed:
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/QueryHelper.java [new file with mode: 0644]
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/AbstractGridFSProcessor.java [new file with mode: 0644]
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/DeleteGridFS.java [new file with mode: 0644]
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/FetchGridFS.java [new file with mode: 0644]
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/PutGridFS.java [new file with mode: 0644]
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.gridfs.DeleteGridFS/additionalDetails.html [new file with mode: 0644]
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.gridfs.FetchGridFS/additionalDetails.html [new file with mode: 0644]
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.gridfs.PutGridFS/additionalDetails.html [new file with mode: 0644]
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/DeleteGridFSIT.java [new file with mode: 0644]
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/FetchGridFSIT.java [new file with mode: 0644]
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/GridFSITTestBase.java [new file with mode: 0644]
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/PutGridFSIT.java [new file with mode: 0644]
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java

index 69de94b..43c210c 100644 (file)
@@ -121,24 +121,24 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
             .build();
 
     public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
-        .name("ssl-context-service")
-        .displayName("SSL Context Service")
-        .description("The SSL Context Service used to provide client certificate information for TLS/SSL "
-                + "connections.")
-        .required(false)
-        .identifiesControllerService(SSLContextService.class)
-        .build();
+            .name("ssl-context-service")
+            .displayName("SSL Context Service")
+            .description("The SSL Context Service used to provide client certificate information for TLS/SSL "
+                    + "connections.")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
 
     public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
-        .name("ssl-client-auth")
-        .displayName("Client Auth")
-        .description("Client authentication policy when connecting to secure (TLS/SSL) cluster. "
-                + "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context "
-                + "has been defined and enabled.")
-        .required(false)
-        .allowableValues(SSLContextService.ClientAuth.values())
-        .defaultValue("REQUIRED")
-        .build();
+            .name("ssl-client-auth")
+            .displayName("Client Auth")
+            .description("Client authentication policy when connecting to secure (TLS/SSL) cluster. "
+                    + "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context "
+                    + "has been defined and enabled.")
+            .required(false)
+            .allowableValues(SSLContextService.ClientAuth.values())
+            .defaultValue("REQUIRED")
+            .build();
 
     public static final PropertyDescriptor WRITE_CONCERN = new PropertyDescriptor.Builder()
             .name("Write Concern")
@@ -341,7 +341,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
     }
 
     protected void writeBatch(String payload, FlowFile parent, ProcessContext context, ProcessSession session,
-            Map<String, String> extraAttributes, Relationship rel) throws UnsupportedEncodingException {
+                              Map<String, String> extraAttributes, Relationship rel) throws UnsupportedEncodingException {
         String charset = context.getProperty(CHARSET).evaluateAttributeExpressions(parent).getValue();
 
         FlowFile flowFile = parent != null ? session.create(parent) : session.create();
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/QueryHelper.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/QueryHelper.java
new file mode 100644 (file)
index 0000000..c31b016
--- /dev/null
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.mongodb;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public interface QueryHelper {
+    AllowableValue MODE_ONE_COMMIT = new AllowableValue("all-at-once", "Full Query Fetch",
+            "Fetch the entire query result and then make it available to downstream processors.");
+    AllowableValue MODE_MANY_COMMITS = new AllowableValue("streaming", "Stream Query Results",
+            "As soon as the query start sending results to the downstream processors at regular intervals.");
+
+    PropertyDescriptor OPERATION_MODE = new PropertyDescriptor.Builder()
+            .name("mongo-operation-mode")
+            .displayName("Operation Mode")
+            .allowableValues(MODE_ONE_COMMIT, MODE_MANY_COMMITS)
+            .defaultValue(MODE_ONE_COMMIT.getValue())
+            .required(true)
+            .description("This option controls when results are made available to downstream processors. If Stream Query Results is enabled, " +
+                    "provenance will not be tracked relative to the input flowfile if an input flowfile is received and starts the query. In Stream Query Results mode " +
+                    "errors will be handled by sending a new flowfile with the original content and attributes of the input flowfile to the failure " +
+                    "relationship. Streaming should only be used if there is reliable connectivity between MongoDB and NiFi.")
+            .addValidator(Validator.VALID)
+            .build();
+
+    default String readQuery(ProcessContext context, ProcessSession session, PropertyDescriptor queryProp, FlowFile input) throws IOException {
+        String queryStr;
+
+        if (context.getProperty(queryProp).isSet()) {
+            queryStr = context.getProperty(queryProp).evaluateAttributeExpressions(input).getValue();
+        } else if (!context.getProperty(queryProp).isSet() && input == null) {
+            queryStr = "{}";
+        } else {
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            session.exportTo(input, out);
+            out.close();
+            queryStr = new String(out.toByteArray());
+        }
+
+        return queryStr;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/AbstractGridFSProcessor.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/AbstractGridFSProcessor.java
new file mode 100644 (file)
index 0000000..48368b6
--- /dev/null
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.mongodb.gridfs;
+
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.gridfs.GridFSBucket;
+import com.mongodb.client.gridfs.GridFSBuckets;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.mongodb.MongoDBClientService;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+import org.bson.types.ObjectId;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractGridFSProcessor extends AbstractProcessor {
+    static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
+        .name("gridfs-client-service")
+        .displayName("Client Service")
+        .description("The MongoDB client service to use for database connections.")
+        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+        .required(true)
+        .identifiesControllerService(MongoDBClientService.class)
+        .build();
+
+    static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder()
+        .name("gridfs-database-name")
+        .displayName("Mongo Database Name")
+        .description("The name of the database to use")
+        .required(true)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+
+    static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor.Builder()
+        .name("gridfs-bucket-name")
+        .displayName("Bucket Name")
+        .description("The GridFS bucket where the files will be stored. If left blank, it will use the default value 'fs' " +
+                "that the MongoDB client driver uses.")
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(false)
+        .addValidator(Validator.VALID)
+        .build();
+
+    static final PropertyDescriptor FILE_NAME = new PropertyDescriptor.Builder()
+        .name("gridfs-file-name")
+        .displayName("File Name")
+        .description("The name of the file in the bucket that is the target of this processor.")
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+
+    static final PropertyDescriptor QUERY_ATTRIBUTE = new PropertyDescriptor.Builder()
+        .name("mongo-query-attribute")
+        .displayName("Query Output Attribute")
+        .description("If set, the query will be written to a specified attribute on the output flowfiles.")
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+        .required(false)
+        .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("When there is a failure processing the flowfile, it goes to this relationship.")
+        .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("When the operation succeeds, the flowfile is sent to this relationship.")
+        .build();
+
+    static final List<PropertyDescriptor> PARENT_PROPERTIES;
+
+    static final Set<Relationship> PARENT_RELATIONSHIPS;
+
+    protected volatile MongoDBClientService clientService;
+
+    static {
+        List<PropertyDescriptor> _temp = new ArrayList<>();
+        _temp.add(CLIENT_SERVICE);
+        _temp.add(DATABASE_NAME);
+        _temp.add(BUCKET_NAME);
+        PARENT_PROPERTIES = Collections.unmodifiableList(_temp);
+
+        Set<Relationship> _rels = new HashSet<>();
+        _rels.add(REL_SUCCESS);
+        _rels.add(REL_FAILURE);
+        PARENT_RELATIONSHIPS = Collections.unmodifiableSet(_rels);
+    }
+
+    protected MongoDatabase getDatabase(FlowFile input, ProcessContext context) {
+        return clientService.getDatabase(context.getProperty(DATABASE_NAME)
+                .evaluateAttributeExpressions(input)
+                .getValue());
+    }
+
+    protected GridFSBucket getBucket(FlowFile input, ProcessContext context) {
+        final String name = getBucketName(input, context);
+        if (StringUtils.isEmpty(name)) {
+            return GridFSBuckets.create(getDatabase(input, context));
+        } else {
+            return GridFSBuckets.create(getDatabase(input, context), name);
+        }
+    }
+
+    protected String getBucketName(FlowFile input, ProcessContext context) {
+        return context.getProperty(BUCKET_NAME).isSet()
+            ? context.getProperty(BUCKET_NAME).evaluateAttributeExpressions(input).getValue()
+            : null;
+    }
+
+    protected String getTransitUri(ObjectId id, FlowFile input, ProcessContext context) {
+        String bucket = getBucketName(input, context);
+        String uri = clientService.getURI();
+        return new StringBuilder()
+            .append(uri)
+            .append(uri.endsWith("/") ? "" : "/")
+            .append(bucket)
+            .append("/")
+            .append(id.toString())
+            .toString();
+    }
+}
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/DeleteGridFS.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/DeleteGridFS.java
new file mode 100644 (file)
index 0000000..680731b
--- /dev/null
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.mongodb.gridfs;
+
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.gridfs.GridFSBucket;
+import com.mongodb.client.gridfs.model.GridFSFile;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.mongodb.MongoDBClientService;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.JsonValidator;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+import org.bson.Document;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@CapabilityDescription("Deletes a file from GridFS using a file name or a query.")
+@Tags({"gridfs", "delete", "mongodb"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class DeleteGridFS extends AbstractGridFSProcessor {
+    static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
+        .name("delete-gridfs-query")
+        .displayName("Query")
+        .description("A valid MongoDB query to use to find and delete one or more files from GridFS.")
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(JsonValidator.INSTANCE)
+        .required(false)
+        .build();
+
+    static final PropertyDescriptor FILE_NAME = new PropertyDescriptor.Builder()
+        .name("gridfs-file-name")
+        .displayName("File Name")
+        .description("The name of the file in the bucket that is the target of this processor. GridFS file names do not " +
+                "include path information because GridFS does not sort files into folders within a bucket.")
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+
+    static final List<PropertyDescriptor> DESCRIPTORS;
+
+    static {
+        List<PropertyDescriptor> _temp = new ArrayList<>();
+        _temp.addAll(PARENT_PROPERTIES);
+        _temp.add(FILE_NAME);
+        _temp.add(QUERY);
+        _temp.add(QUERY_ATTRIBUTE);
+        DESCRIPTORS = Collections.unmodifiableList(_temp);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return new HashSet<>(PARENT_RELATIONSHIPS);
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+        ArrayList<ValidationResult> problems = new ArrayList<>();
+
+        boolean fileName = validationContext.getProperty(FILE_NAME).isSet();
+        boolean query    = validationContext.getProperty(QUERY).isSet();
+
+        if (fileName && query) {
+            problems.add(new ValidationResult.Builder()
+                .valid(false)
+                .explanation("File name and Query cannot be set at the same time.")
+                .build()
+            );
+        } else if (!fileName && !query) {
+            problems.add(new ValidationResult.Builder()
+                .valid(false)
+                .explanation("File name or Query must be set, but not both at the same time.")
+                .build()
+            );
+        }
+
+        return problems;
+    }
+
+    private String getQuery(ProcessContext context, FlowFile input) {
+        String queryString;
+        if (context.getProperty(FILE_NAME).isSet()) {
+            String fileName = context.getProperty(FILE_NAME).evaluateAttributeExpressions(input).getValue();
+            queryString = String.format("{ \"filename\": \"%s\"}", fileName);
+        } else {
+            queryString = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
+        }
+
+        return queryString;
+    }
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        this.clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class);
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile input = session.get();
+        if (input == null) {
+            return;
+        }
+
+        final String deleteQuery = getQuery(context, input);
+        final String queryAttribute = context.getProperty(QUERY_ATTRIBUTE).isSet()
+                ? context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue()
+                : null;
+        GridFSBucket bucket = getBucket(input, context);
+
+        try {
+            Document query = Document.parse(deleteQuery);
+            MongoCursor cursor = bucket.find(query).iterator();
+            if (cursor.hasNext()) {
+                GridFSFile file = (GridFSFile)cursor.next();
+                bucket.delete(file.getObjectId());
+
+                if (!StringUtils.isEmpty(queryAttribute)) {
+                    input = session.putAttribute(input, queryAttribute, deleteQuery);
+                }
+
+                session.transfer(input, REL_SUCCESS);
+            } else {
+                getLogger().error(String.format("Query %s did not delete anything in %s", deleteQuery, bucket.getBucketName()));
+                session.transfer(input, REL_FAILURE);
+            }
+
+            cursor.close();
+        } catch (Exception ex) {
+            getLogger().error(String.format("Error deleting using query: %s", deleteQuery), ex);
+            session.transfer(input, REL_FAILURE);
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/FetchGridFS.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/FetchGridFS.java
new file mode 100644 (file)
index 0000000..11d4b87
--- /dev/null
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.mongodb.gridfs;
+
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.gridfs.GridFSBucket;
+import com.mongodb.client.gridfs.model.GridFSFile;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.mongodb.MongoDBClientService;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.JsonValidator;
+import org.apache.nifi.processors.mongodb.QueryHelper;
+import org.apache.nifi.util.StringUtils;
+import org.bson.Document;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttributes(
+    @WritesAttribute(attribute = "gridfs.file.metadata", description = "The custom metadata stored with a file is attached to this property if it exists.")
+)
+@Tags({"fetch", "gridfs", "mongo"})
+@CapabilityDescription("Retrieves one or more files from a GridFS bucket by file name or by a user-defined query.")
+public class FetchGridFS extends AbstractGridFSProcessor implements QueryHelper {
+
+    static final String METADATA_ATTRIBUTE = "gridfs.file.metadata";
+
+    static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
+        .name("gridfs-query")
+        .displayName("Query")
+        .description("A valid MongoDB query to use to fetch one or more files from GridFS.")
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(JsonValidator.INSTANCE)
+        .required(false)
+        .build();
+
+    static final Relationship REL_ORIGINAL = new Relationship.Builder()
+        .name("original")
+        .description("The original input flowfile goes to this relationship if the query does not cause an error")
+        .build();
+
+    static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
+    static final Set<Relationship> RELATIONSHIP_SET;
+
+    static {
+        List<PropertyDescriptor> _temp = new ArrayList<>();
+        _temp.addAll(PARENT_PROPERTIES);
+        _temp.add(FILE_NAME);
+        _temp.add(QUERY);
+        _temp.add(QUERY_ATTRIBUTE);
+        _temp.add(OPERATION_MODE);
+        PROPERTY_DESCRIPTORS = Collections.unmodifiableList(_temp);
+
+        Set<Relationship> _rels = new HashSet<>();
+        _rels.addAll(PARENT_RELATIONSHIPS);
+        _rels.add(REL_ORIGINAL);
+        RELATIONSHIP_SET = Collections.unmodifiableSet(_rels);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIP_SET;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    private String getQuery(ProcessSession session, ProcessContext context, FlowFile input) throws IOException {
+        String queryString;
+        if (context.getProperty(FILE_NAME).isSet()) {
+            String fileName = context.getProperty(FILE_NAME).evaluateAttributeExpressions(input).getValue();
+            queryString = String.format("{ \"filename\": \"%s\"}", fileName);
+        } else if (context.getProperty(QUERY).isSet()) {
+            queryString = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
+        } else {
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            session.exportTo(input, out);
+            out.close();
+            queryString = new String(out.toByteArray());
+        }
+
+        return queryString;
+    }
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        this.clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class);
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile input = session.get();
+        if (input == null) {
+            return;
+        }
+
+        final String operatingMode = context.getProperty(OPERATION_MODE).getValue();
+        final Map<String, String> originalAttributes = input.getAttributes();
+
+        String queryStr;
+        try {
+            queryStr = getQuery(session, context, input);
+            if (StringUtils.isEmpty(queryStr)) {
+                getLogger().error("No query could be found or built from the supplied input.");
+                session.transfer(input, REL_FAILURE);
+                return;
+            }
+        } catch (IOException ex) {
+            getLogger().error("No query could be found from supplied input", ex);
+            session.transfer(input, REL_FAILURE);
+            return;
+        }
+
+        Document query = Document.parse(queryStr);
+
+        try {
+            final GridFSBucket bucket = getBucket(input, context);
+            final String queryPtr = queryStr;
+            final FlowFile parent = operatingMode.equals(MODE_ONE_COMMIT.getValue()) ? input : null;
+
+            MongoCursor it = bucket.find(query).iterator();
+            if (operatingMode.equals(MODE_MANY_COMMITS.getValue())) {
+                session.transfer(input, REL_ORIGINAL);
+                input = null;
+            }
+
+            while (it.hasNext()) {
+                GridFSFile gridFSFile = (GridFSFile)it.next();
+                handleFile(bucket, session, context, parent, gridFSFile, queryPtr);
+
+                if (operatingMode.equals(MODE_MANY_COMMITS.getValue())) {
+                    session.commit();
+                }
+            }
+
+            if (input != null) {
+                session.transfer(input, REL_ORIGINAL);
+            }
+        } catch (Exception ex) {
+            getLogger().error("An error occurred wile trying to run the query.", ex);
+            if (input != null && operatingMode.equals(MODE_ONE_COMMIT.getValue())) {
+                session.transfer(input, REL_FAILURE);
+            } else if (input != null && operatingMode.equals(MODE_MANY_COMMITS.getValue())) {
+                final String queryPtr = queryStr;
+                FlowFile cloned = session.create();
+                cloned = session.putAllAttributes(cloned, originalAttributes);
+                cloned = session.write(cloned, out -> out.write(queryPtr.getBytes()));
+                session.transfer(cloned, REL_FAILURE);
+            }
+        }
+    }
+
+    private void handleFile(GridFSBucket bucket, ProcessSession session, ProcessContext context, FlowFile parent, GridFSFile input, String query) {
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(METADATA_ATTRIBUTE, input.getMetadata() != null ? input.getMetadata().toJson() : "{}");
+        if (context.getProperty(QUERY_ATTRIBUTE).isSet()) {
+            String key = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(parent).getValue();
+            attrs.put(key, query);
+        }
+        attrs.put(CoreAttributes.FILENAME.key(), input.getFilename());
+        FlowFile output = parent != null ? session.create(parent) : session.create();
+        output = session.write(output, out -> bucket.downloadToStream(input.getObjectId(), out));
+        output = session.putAllAttributes(output, attrs);
+        session.transfer(output, REL_SUCCESS);
+        session.getProvenanceReporter().receive(output, getTransitUri(input.getObjectId(), output, context));
+    }
+}
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/PutGridFS.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/gridfs/PutGridFS.java
new file mode 100644 (file)
index 0000000..be9dd46
--- /dev/null
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.mongodb.gridfs;
+
+import com.mongodb.client.gridfs.GridFSBucket;
+import com.mongodb.client.gridfs.model.GridFSUploadOptions;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.mongodb.MongoDBClientService;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+import org.bson.Document;
+import org.bson.types.ObjectId;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"mongo", "gridfs", "put", "file", "store"})
+@CapabilityDescription("Writes a file to a GridFS bucket.")
+public class PutGridFS extends AbstractGridFSProcessor {
+
+    static final PropertyDescriptor PROPERTIES_PREFIX = new PropertyDescriptor.Builder()
+        .name("putgridfs-properties-prefix")
+        .displayName("File Properties Prefix")
+        .description("Attributes that have this prefix will be added to the file stored in GridFS as metadata.")
+        .required(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(Validator.VALID)
+        .build();
+
+    static final AllowableValue NO_UNIQUE   = new AllowableValue("none", "None", "No uniqueness will be enforced.");
+    static final AllowableValue UNIQUE_NAME = new AllowableValue("name", "Name", "Only the filename must " +
+            "be unique.");
+    static final AllowableValue UNIQUE_HASH = new AllowableValue("hash", "Hash", "Only the file hash must be " +
+            "unique.");
+    static final AllowableValue UNIQUE_BOTH = new AllowableValue("both", "Both", "Both the filename and hash " +
+            "must be unique.");
+
+    static final PropertyDescriptor ENFORCE_UNIQUENESS = new PropertyDescriptor.Builder()
+        .name("putgridfs-enforce-uniqueness")
+        .displayName("Enforce Uniqueness")
+        .description("When enabled, this option will ensure that uniqueness is enforced on the bucket. It will do so by creating a MongoDB index " +
+                "that matches your selection. It should ideally be configured once when the bucket is created for the first time because " +
+                "it could take a long time to build on an existing bucket wit a lot of data.")
+        .allowableValues(NO_UNIQUE, UNIQUE_BOTH, UNIQUE_NAME, UNIQUE_HASH)
+        .defaultValue(NO_UNIQUE.getValue())
+        .required(true)
+        .build();
+    static final PropertyDescriptor HASH_ATTRIBUTE = new PropertyDescriptor.Builder()
+        .name("putgridfs-hash-attribute")
+        .displayName("Hash Attribute")
+        .description("If uniquness enforcement is enabled and the file hash is part of the constraint, this must be set to an attribute that " +
+                "exists on all incoming flowfiles.")
+        .defaultValue("hash.value")
+        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+    static final PropertyDescriptor CHUNK_SIZE = new PropertyDescriptor.Builder()
+        .name("putgridfs-chunk-size")
+        .displayName("Chunk Size")
+        .description("Controls the maximum size of each chunk of a file uploaded into GridFS.")
+        .defaultValue("256 KB")
+        .required(true)
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .build();
+
+    static final PropertyDescriptor FILE_NAME = new PropertyDescriptor.Builder()
+        .name("gridfs-file-name")
+        .displayName("File Name")
+        .description("The name of the file in the bucket that is the target of this processor. GridFS file names do not " +
+                "include path information because GridFS does not sort files into folders within a bucket.")
+        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+
+    static final Relationship REL_DUPLICATE = new Relationship.Builder()
+        .name("duplicate")
+        .description("Flowfiles that fail the duplicate check are sent to this relationship.")
+        .build();
+
+    static final String ID_ATTRIBUTE = "gridfs.id";
+
+    static final List<PropertyDescriptor> DESCRIPTORS;
+    static final Set<Relationship> RELATIONSHIP_SET;
+
+    static {
+        List _temp = new ArrayList<>();
+        _temp.addAll(PARENT_PROPERTIES);
+        _temp.add(FILE_NAME);
+        _temp.add(PROPERTIES_PREFIX);
+        _temp.add(ENFORCE_UNIQUENESS);
+        _temp.add(HASH_ATTRIBUTE);
+        _temp.add(CHUNK_SIZE);
+        DESCRIPTORS = Collections.unmodifiableList(_temp);
+
+        Set _rels = new HashSet();
+        _rels.addAll(PARENT_RELATIONSHIPS);
+        _rels.add(REL_DUPLICATE);
+        RELATIONSHIP_SET = Collections.unmodifiableSet(_rels);
+    }
+
+    private String uniqueness;
+    private String hashAttribute;
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        this.uniqueness = context.getProperty(ENFORCE_UNIQUENESS).getValue();
+        this.hashAttribute = context.getProperty(HASH_ATTRIBUTE).evaluateAttributeExpressions().getValue();
+        this.clientService = context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIP_SET;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile input = session.get();
+        if (input == null) {
+            return;
+        }
+
+        GridFSBucket bucket = getBucket(input, context);
+
+        if (!canUploadFile(context, input, bucket.getBucketName())) {
+            getLogger().error("Cannot upload the file because of the uniqueness policy configured.");
+            session.transfer(input, REL_DUPLICATE);
+            return;
+        }
+
+        final int chunkSize = context.getProperty(CHUNK_SIZE).evaluateAttributeExpressions(input).asDataSize(DataUnit.B).intValue();
+
+        try (InputStream fileInput = session.read(input)) {
+            String fileName = context.getProperty(FILE_NAME).evaluateAttributeExpressions(input).getValue();
+            GridFSUploadOptions options = new GridFSUploadOptions()
+                .chunkSizeBytes(chunkSize)
+                .metadata(getMetadata(input, context));
+            ObjectId id = bucket.uploadFromStream(fileName, fileInput, options);
+            fileInput.close();
+
+            if (id != null) {
+                input = session.putAttribute(input, ID_ATTRIBUTE, id.toString());
+                session.transfer(input, REL_SUCCESS);
+                session.getProvenanceReporter().send(input, getTransitUri(id, input, context));
+            } else {
+                getLogger().error("ID was null, assuming failure.");
+                session.transfer(input, REL_FAILURE);
+            }
+        } catch (Exception ex) {
+            getLogger().error("Failed to upload file", ex);
+            session.transfer(input, REL_FAILURE);
+        }
+    }
+
+    private boolean canUploadFile(ProcessContext context, FlowFile input, String bucketName) {
+        boolean retVal;
+
+        if (uniqueness.equals(NO_UNIQUE.getValue())) {
+            retVal = true;
+        } else {
+            final String fileName = input.getAttribute(CoreAttributes.FILENAME.key());
+            final String fileColl = String.format("%s.files", bucketName);
+            final String hash     = input.getAttribute(hashAttribute);
+
+            if ((uniqueness.equals(UNIQUE_BOTH.getValue()) || uniqueness.equals(UNIQUE_HASH.getValue())) && StringUtils.isEmpty(hash)) {
+                throw new RuntimeException(String.format("Uniqueness mode %s was set and the hash attribute %s was not found.", uniqueness, hashAttribute));
+            }
+
+            Document query;
+            if (uniqueness.equals(UNIQUE_BOTH.getValue())) {
+                query = new Document().append("filename", fileName).append("md5", hash);
+            } else if (uniqueness.equals(UNIQUE_HASH.getValue())) {
+                query = new Document().append("md5", hash);
+            } else {
+                query = new Document().append("filename", fileName);
+            }
+
+            retVal = getDatabase(input, context).getCollection(fileColl).count(query) == 0;
+        }
+
+        return retVal;
+    }
+
+    private Document getMetadata(FlowFile input, ProcessContext context) {
+        final String prefix = context.getProperty(PROPERTIES_PREFIX).evaluateAttributeExpressions(input).getValue();
+        Document doc;
+
+        if (StringUtils.isEmpty(prefix)) {
+            doc = Document.parse("{}");
+        } else {
+            doc = new Document();
+            Map<String, String> attributes = input.getAttributes();
+            for (Map.Entry<String, String> entry : attributes.entrySet()) {
+                if (entry.getKey().startsWith(prefix)) {
+                    String cleanPrefix = prefix.endsWith(".") ? prefix : String.format("%s.", prefix);
+                    String cleanKey = entry.getKey().replace(cleanPrefix, "");
+                    doc.append(cleanKey, entry.getValue());
+                }
+            }
+        }
+
+        return doc;
+    }
+}
index bfe2b74..3797ca0 100644 (file)
@@ -18,4 +18,7 @@ org.apache.nifi.processors.mongodb.GetMongo
 org.apache.nifi.processors.mongodb.GetMongoRecord
 org.apache.nifi.processors.mongodb.RunMongoAggregation
 org.apache.nifi.processors.mongodb.PutMongo
-org.apache.nifi.processors.mongodb.PutMongoRecord
\ No newline at end of file
+org.apache.nifi.processors.mongodb.PutMongoRecord
+org.apache.nifi.processors.mongodb.gridfs.DeleteGridFS
+org.apache.nifi.processors.mongodb.gridfs.FetchGridFS
+org.apache.nifi.processors.mongodb.gridfs.PutGridFS
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.gridfs.DeleteGridFS/additionalDetails.html b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.gridfs.DeleteGridFS/additionalDetails.html
new file mode 100644 (file)
index 0000000..b748755
--- /dev/null
@@ -0,0 +1,32 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>DeleteGridFS</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+<!-- Processor Documentation ================================================== -->
+<h2>Description:</h2>
+<p>
+    This processor retrieves one or more files from GridFS. The query to execute can be either provided in the query
+    configuration parameter or generated from the value pulled from the filename configuration parameter. Upon successful
+    execution, it will append the query that was executed as an attribute on the flowfile that was processed.
+</p>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.gridfs.FetchGridFS/additionalDetails.html b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.gridfs.FetchGridFS/additionalDetails.html
new file mode 100644 (file)
index 0000000..279216c
--- /dev/null
@@ -0,0 +1,43 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>FetchGridFS</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+<!-- Processor Documentation ================================================== -->
+<h2>Description:</h2>
+<p>
+    This processor retrieves one or more files from GridFS. The query can be provided in one of three ways:
+</p>
+
+<ul>
+    <li>Query configuration parameter.</li>
+    <li>Built for you by configuring the filename parameter. (Note: this is just a filename, Mongo queries cannot be
+    embedded in the field).</li>
+    <li>Retrieving the query from the flowfile contents.</li>
+</ul>
+
+<p>
+    The processor can also be configured to either commit only once at the end of a fetch operation or after each file
+    that is retrieved. Multiple commits is generally only necessary when retrieving a lot of data from GridFS as measured
+    in total data size, not file count, to ensure that the disks NiFi is using are not overloaded.
+</p>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.gridfs.PutGridFS/additionalDetails.html b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/resources/docs/org.apache.nifi.processors.mongodb.gridfs.PutGridFS/additionalDetails.html
new file mode 100644 (file)
index 0000000..62330dd
--- /dev/null
@@ -0,0 +1,58 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PutGridFS</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+<!-- Processor Documentation ================================================== -->
+<h2>Description:</h2>
+<p>
+    This processor puts a file with one or more user-defined metadata values into GridFS in the configured bucket. It
+    allows the user to define how big each file chunk will be during ingestion and provides some ability to intelligently
+    attempt to enforce file uniqueness using filename or hash values instead of just relying on a database index.
+</p>
+<h3>GridFS File Attributes</h3>
+<p>
+    <em>PutGridFS</em> allows for flowfile attributes that start with a configured prefix to be added to the GridFS
+    document. These can be very useful later when working with GridFS for providing metadata about a file.
+</p>
+<h3>Chunk Size</h3>
+<p>
+    GridFS splits up file into chunks within Mongo documents as the file is ingested into the database. The chunk size
+    configuration parameter configures the maximum size of each chunk. This field should be left at its default value
+    unless there is a specific business case to increase or decrease it.
+</p>
+<h3>Uniqueness Enforcement</h3>
+<p>
+    There are four operating modes:
+</p>
+<ul>
+    <li>No enforcement at the application level.</li>
+    <li>Enforce by unique file name.</li>
+    <li>Enforce by unique hash value.</li>
+    <li>Use both hash and file name.</li>
+</ul>
+<p>
+    The hash value by default is taken from the attribute <em>hash.value</em> which can be generated by configuring a
+    <em>HashContent</em> processor upstream of <em>PutGridFS</em>. Both this and the name option use a query on the existing
+    data to see if a file matching that criteria exists before attempting to write the flowfile contents.
+</p>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/DeleteGridFSIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/DeleteGridFSIT.java
new file mode 100644 (file)
index 0000000..e006ecb
--- /dev/null
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.mongodb.gridfs;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.bson.types.ObjectId;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DeleteGridFSIT extends GridFSITTestBase {
+    private TestRunner runner;
+    private static final String BUCKET = "delete_test_bucket";
+
+    @Before
+    public void setup() throws Exception {
+        runner = TestRunners.newTestRunner(DeleteGridFS.class);
+        super.setup(runner, BUCKET, false);
+    }
+
+    @After
+    public void tearDown() {
+        super.tearDown();
+    }
+
+    @Test
+    public void testFileAndQueryAtSameTime() {
+        runner.setProperty(DeleteGridFS.FILE_NAME, "${test_var}");
+        runner.setProperty(DeleteGridFS.QUERY, "{}");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testNeitherFileNorQuery() {
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testDeleteByFileName() {
+        testDeleteByProperty(DeleteGridFS.FILE_NAME, String.format("${%s}", CoreAttributes.FILENAME.key()), setupTestFile());
+    }
+
+    @Test
+    public void testDeleteByQuery() {
+        testDeleteByProperty(DeleteGridFS.QUERY, "{}", setupTestFile());
+    }
+
+    @Test
+    public void testQueryAttribute() {
+        String attrName = "gridfs.query.used";
+        String fileName = setupTestFile();
+        runner.setProperty(DeleteGridFS.QUERY_ATTRIBUTE, attrName);
+        testDeleteByProperty(DeleteGridFS.FILE_NAME, String.format("${%s}", CoreAttributes.FILENAME.key()), fileName);
+        testForQueryAttribute(fileName, attrName);
+    }
+
+    private void testForQueryAttribute(String mustContain, String attrName) {
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(DeleteGridFS.REL_SUCCESS);
+        String attribute = flowFiles.get(0).getAttribute(attrName);
+        Assert.assertTrue(attribute.contains(mustContain));
+    }
+
+    private String setupTestFile() {
+        String fileName = "simple-delete-test.txt";
+        ObjectId id = writeTestFile(fileName, "Hello, world!", BUCKET, new HashMap<>());
+        Assert.assertNotNull(id);
+
+        return fileName;
+    }
+
+    private void testDeleteByProperty(PropertyDescriptor descriptor, String value, String fileName) {
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(CoreAttributes.FILENAME.key(), fileName);
+        runner.setProperty(descriptor, value);
+        runner.assertValid();
+        runner.enqueue("test", attrs);
+        runner.run();
+
+        runner.assertTransferCount(DeleteGridFS.REL_FAILURE, 0);
+        runner.assertTransferCount(DeleteGridFS.REL_SUCCESS, 1);
+
+        Assert.assertFalse(String.format("File %s still exists.", fileName), fileExists(fileName, BUCKET));
+    }
+}
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/FetchGridFSIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/FetchGridFSIT.java
new file mode 100644 (file)
index 0000000..5ce4ff3
--- /dev/null
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.mongodb.gridfs;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processors.mongodb.QueryHelper;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.bson.types.ObjectId;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FetchGridFSIT extends GridFSITTestBase {
+    TestRunner runner;
+
+    static final String BUCKET = "get_test_bucket";
+
+    @Before
+    public void setup() throws Exception {
+        runner = TestRunners.newTestRunner(FetchGridFS.class);
+        super.setup(runner, BUCKET, false);
+    }
+
+    @After
+    public void tearDown() {
+        super.tearDown();
+    }
+
+    @Test
+    public void testGetOneByName() {
+        final String fileName = "get_by_name.txt";
+        final String content  = "Hello, world";
+        ObjectId id = writeTestFile(fileName, content, BUCKET, new HashMap<>());
+        Assert.assertNotNull(id);
+
+        String query = String.format("{\"filename\": \"%s\"}", fileName);
+        runner.enqueue(query);
+        runner.run();
+        runner.assertTransferCount(FetchGridFS.REL_FAILURE, 0);
+        runner.assertTransferCount(FetchGridFS.REL_ORIGINAL, 1);
+        runner.assertTransferCount(FetchGridFS.REL_SUCCESS, 1);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FetchGridFS.REL_SUCCESS);
+        byte[] rawData = runner.getContentAsByteArray(flowFiles.get(0));
+        Assert.assertEquals("Data did not match for the file", new String(rawData), content);
+
+        runner.clearTransferState();
+        runner.setProperty(FetchGridFS.QUERY, query);
+        runner.enqueue("test");
+        runner.run();
+
+        runner.assertTransferCount(FetchGridFS.REL_FAILURE, 0);
+        runner.assertTransferCount(FetchGridFS.REL_ORIGINAL, 1);
+        runner.assertTransferCount(FetchGridFS.REL_SUCCESS, 1);
+        flowFiles = runner.getFlowFilesForRelationship(FetchGridFS.REL_SUCCESS);
+        rawData = runner.getContentAsByteArray(flowFiles.get(0));
+        Assert.assertEquals("Data did not match for the file", new String(rawData), content);
+    }
+
+    @Test
+    public void testGetMany() {
+        String baseName = "test_file_%d.txt";
+        String content  = "Hello, world take %d";
+        for (int index = 0; index < 5; index++) {
+            ObjectId id = writeTestFile(String.format(baseName, index), String.format(content, index), BUCKET, new HashMap<>());
+            Assert.assertNotNull(id);
+        }
+
+        AllowableValue[] values = new AllowableValue[] { QueryHelper.MODE_MANY_COMMITS, QueryHelper.MODE_ONE_COMMIT };
+
+        for (AllowableValue value : values) {
+            String query = "{}";
+            runner.setProperty(FetchGridFS.OPERATION_MODE, value);
+            runner.enqueue(query);
+            runner.run();
+
+            runner.assertTransferCount(FetchGridFS.REL_FAILURE, 0);
+            runner.assertTransferCount(FetchGridFS.REL_ORIGINAL, 1);
+            runner.assertTransferCount(FetchGridFS.REL_SUCCESS, 5);
+            runner.clearTransferState();
+        }
+    }
+
+    @Test
+    public void testQueryAttribute() {
+        final String fileName = "get_by_name.txt";
+        final String content  = "Hello, world";
+        ObjectId id = writeTestFile(fileName, content, BUCKET, new HashMap<>());
+        Assert.assertNotNull(id);
+
+        final String queryAttr = "gridfs.query.used";
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put(CoreAttributes.FILENAME.key(), fileName);
+        runner.setProperty(FetchGridFS.FILE_NAME, String.format("${%s}", CoreAttributes.FILENAME.key()));
+        runner.setProperty(FetchGridFS.QUERY_ATTRIBUTE, queryAttr);
+        runner.enqueue(content, attrs);
+        runner.run();
+
+        runner.assertTransferCount(FetchGridFS.REL_FAILURE, 0);
+        runner.assertTransferCount(FetchGridFS.REL_ORIGINAL, 1);
+        runner.assertTransferCount(FetchGridFS.REL_SUCCESS, 1);
+        MockFlowFile mff = runner.getFlowFilesForRelationship(FetchGridFS.REL_SUCCESS).get(0);
+        String attr = mff.getAttribute(queryAttr);
+        Assert.assertNotNull("Query attribute was null.", attr);
+        Assert.assertTrue("Wrong content.", attr.contains("filename"));
+
+        runner.clearTransferState();
+
+        id = writeTestFile(fileName, content, BUCKET, new HashMap<String, Object>(){{
+            put("lookupKey", "xyz");
+        }});
+        Assert.assertNotNull(id);
+
+        String query = "{ \"metadata\": { \"lookupKey\": \"xyz\" }}";
+
+        runner.removeProperty(FetchGridFS.FILE_NAME);
+        runner.setProperty(FetchGridFS.QUERY, query);
+        runner.enqueue(content, attrs);
+        runner.run();
+        runner.assertTransferCount(FetchGridFS.REL_FAILURE, 0);
+        runner.assertTransferCount(FetchGridFS.REL_ORIGINAL, 1);
+        runner.assertTransferCount(FetchGridFS.REL_SUCCESS, 1);
+        mff = runner.getFlowFilesForRelationship(FetchGridFS.REL_SUCCESS).get(0);
+        attr = mff.getAttribute(queryAttr);
+        Assert.assertNotNull("Query attribute was null.", attr);
+        Assert.assertTrue("Wrong content.", attr.contains("metadata"));
+    }
+
+    @Test
+    public void testGetQueryFromBody() {
+        runner.enqueue("{}");
+        testQueryFromSource(0, 1, 1);
+    }
+
+    @Test
+    public void testGetQueryFromQueryParam() {
+        runner.setProperty(FetchGridFS.QUERY, "{}");
+        runner.enqueue("");
+        testQueryFromSource(0, 1, 1);
+    }
+
+    @Test
+    public void testGetQueryFromFileNameParam() {
+        Map<String, String> attr = new HashMap<>();
+        attr.put(CoreAttributes.FILENAME.key(), "get_by_name.txt");
+        runner.setProperty(FetchGridFS.FILE_NAME, String.format("${%s}", CoreAttributes.FILENAME.key()));
+        runner.enqueue("test", attr);
+        testQueryFromSource(0, 1, 1);
+    }
+
+    private void testQueryFromSource(int failure, int original, int success) {
+        final String fileName = "get_by_name.txt";
+        final String content  = "Hello, world";
+        ObjectId id = writeTestFile(fileName, content, BUCKET, new HashMap<>());
+        Assert.assertNotNull(id);
+
+        runner.run();
+        runner.assertTransferCount(FetchGridFS.REL_FAILURE, failure);
+        runner.assertTransferCount(FetchGridFS.REL_ORIGINAL, original);
+        runner.assertTransferCount(FetchGridFS.REL_SUCCESS, success);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/GridFSITTestBase.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/GridFSITTestBase.java
new file mode 100644 (file)
index 0000000..45e7cb2
--- /dev/null
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.mongodb.gridfs;
+
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.gridfs.GridFSBucket;
+import com.mongodb.client.gridfs.GridFSBuckets;
+import com.mongodb.client.gridfs.model.GridFSFile;
+import com.mongodb.client.gridfs.model.GridFSUploadOptions;
+import org.apache.nifi.mongodb.MongoDBClientService;
+import org.apache.nifi.mongodb.MongoDBControllerService;
+import org.apache.nifi.util.TestRunner;
+import org.bson.Document;
+import org.bson.types.ObjectId;
+
+import java.io.ByteArrayInputStream;
+import java.util.Map;
+
+public class GridFSITTestBase {
+    static final String URI = "mongodb://localhost:27017";
+    static final String DB  = "gridfs_test_database";
+    MongoClient client;
+
+    public void setup(TestRunner runner, String bucketName) throws Exception {
+        setup(runner, bucketName, true);
+    }
+
+    public void setup(TestRunner runner, String bucketName, boolean validate) throws Exception {
+        MongoDBClientService clientService = new MongoDBControllerService();
+        runner.addControllerService("clientService", clientService);
+        runner.setProperty(AbstractGridFSProcessor.CLIENT_SERVICE, "clientService");
+        runner.setProperty(clientService, MongoDBControllerService.URI, URI);
+        runner.setProperty(AbstractGridFSProcessor.BUCKET_NAME, bucketName);
+        runner.setProperty(AbstractGridFSProcessor.DATABASE_NAME, DB);
+        runner.enableControllerService(clientService);
+        runner.setValidateExpressionUsage(true);
+        if (validate) {
+            runner.assertValid();
+        }
+
+        client = new MongoClient("localhost", 27017);
+    }
+    public void tearDown() {
+        client.dropDatabase(DB);
+        client.close();
+    }
+
+    public boolean fileExists(String name, String bucketName) {
+        GridFSBucket bucket = GridFSBuckets.create(client.getDatabase(DB), bucketName);
+        MongoCursor it = bucket.find(Document.parse(String.format("{ \"filename\": \"%s\" }", name))).iterator();
+        boolean retVal = it.hasNext();
+        it.close();
+
+        return retVal;
+    }
+
+    public ObjectId writeTestFile(String fileName, String content, String bucketName, Map<String, Object> attrs) {
+        GridFSBucket bucket = GridFSBuckets.create(client.getDatabase(DB), bucketName);
+        GridFSUploadOptions options = new GridFSUploadOptions().metadata(new Document(attrs));
+        ByteArrayInputStream input = new ByteArrayInputStream(content.getBytes());
+        ObjectId retVal = bucket.uploadFromStream(fileName, input, options);
+
+        return retVal;
+    }
+
+    public boolean fileHasProperties(String name, String bucketName, Map<String, String> attrs) {
+        GridFSBucket bucket = GridFSBuckets.create(client.getDatabase(DB), bucketName);
+        MongoCursor it = bucket.find(Document.parse(String.format("{ \"filename\": \"%s\" }", name))).iterator();
+        boolean retVal = false;
+
+        if (it.hasNext()) {
+            GridFSFile file = (GridFSFile)it.next();
+            Document metadata = file.getMetadata();
+            if (metadata != null && metadata.size() == attrs.size()) {
+                retVal = true;
+                for (Map.Entry<String, Object> entry : metadata.entrySet()) {
+                    Object val = attrs.get(entry.getKey());
+                    if (val == null || !entry.getValue().equals(val)) {
+                        retVal = false;
+                        break;
+                    }
+                }
+            }
+        }
+
+        it.close();
+
+        return retVal;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/PutGridFSIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/gridfs/PutGridFSIT.java
new file mode 100644 (file)
index 0000000..dfd7ae0
--- /dev/null
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.mongodb.gridfs;
+
+import com.mongodb.client.MongoCollection;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.bson.Document;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class PutGridFSIT extends GridFSITTestBase {
+    TestRunner runner;
+
+    static final String BUCKET = "put_test_bucket";
+
+    @Before
+    public void setup() throws Exception {
+        runner = TestRunners.newTestRunner(PutGridFS.class);
+        runner.setProperty(PutGridFS.FILE_NAME, String.format("${%s}", CoreAttributes.FILENAME.key()));
+        super.setup(runner, BUCKET);
+    }
+
+    @After
+    public void tearDown() {
+        super.tearDown();
+    }
+
+    @Test
+    public void testSimplePut() {
+        final String fileName = "simple_test.txt";
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(CoreAttributes.FILENAME.key(), fileName);
+
+        runner.enqueue("12345", attrs);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutGridFS.REL_SUCCESS);
+
+        Assert.assertTrue("File does not exist", fileExists(fileName, BUCKET));
+    }
+
+    @Test
+    public void testWithProperties() {
+        final String fileName = "simple_test_props.txt";
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(CoreAttributes.FILENAME.key(), fileName);
+        attrs.put("prop.created_by", "john.smith");
+        attrs.put("prop.created_for", "jane.doe");
+        attrs.put("prop.restrictions", "PHI&PII");
+        attrs.put("prop.department", "Accounting");
+
+        runner.setProperty(PutGridFS.PROPERTIES_PREFIX, "prop");
+        runner.enqueue("12345", attrs);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutGridFS.REL_SUCCESS);
+
+        attrs = new HashMap<String, String>(){{
+            put("created_by", "john.smith");
+            put("created_for", "jane.doe");
+            put("restrictions", "PHI&PII");
+            put("department", "Accounting");
+        }};
+
+        Assert.assertTrue("File does not exist", fileExists(fileName, BUCKET));
+        Assert.assertTrue("File is missing PARENT_PROPERTIES", fileHasProperties(fileName, BUCKET, attrs));
+    }
+
+    @Test
+    public void testNoUniqueness() {
+        String fileName = "test_duplicates.txt";
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(CoreAttributes.FILENAME.key(), fileName);
+
+        for (int x = 0; x < 10; x++) {
+            runner.enqueue("Duplicates are ok.", attrs);
+            runner.run();
+        }
+
+        runner.assertTransferCount(PutGridFS.REL_SUCCESS, 10);
+
+        String bucketName = String.format("%s.files", BUCKET);
+        MongoCollection files = client.getDatabase(DB).getCollection(bucketName);
+        Document query = Document.parse(String.format("{\"filename\": \"%s\"}", fileName));
+        long count = files.count(query);
+        Assert.assertTrue("Wrong count", count == 10);
+    }
+
+    @Test
+    public void testFileNameUniqueness() {
+        String fileName = "test_duplicates.txt";
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(CoreAttributes.FILENAME.key(), fileName);
+        testUniqueness(attrs, "Hello, world", PutGridFS.UNIQUE_NAME);
+    }
+
+    @Test
+    public void testFileNameAndHashUniqueness() {
+        testHashUniqueness(PutGridFS.UNIQUE_BOTH);
+    }
+
+    @Test
+    public void testHashUniqueness() {
+        testHashUniqueness(PutGridFS.UNIQUE_HASH);
+    }
+
+
+    @Test
+    public void testChunkSize() {
+        String[] chunkSizes = new String[] { "128 KB", "256 KB", "384 KB", "512KB", "768KB", "1024 KB" };
+        StringBuilder sb = new StringBuilder();
+        for (int x = 0; x < 10000; x++) {
+            sb.append("This is a test string used to build up a largish text file.");
+        }
+        final String testData = sb.toString();
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put(CoreAttributes.FILENAME.key(), "big-putgridfs-test-file.txt");
+
+        for (String chunkSize : chunkSizes) {
+            runner.setProperty(PutGridFS.CHUNK_SIZE, chunkSize);
+            runner.enqueue(testData, attrs);
+            runner.run();
+            runner.assertTransferCount(PutGridFS.REL_FAILURE, 0);
+            runner.assertTransferCount(PutGridFS.REL_DUPLICATE, 0);
+            runner.assertTransferCount(PutGridFS.REL_SUCCESS, 1);
+
+            runner.clearTransferState();
+        }
+
+        runner.setProperty(PutGridFS.CHUNK_SIZE, "${gridfs.chunk.size}");
+        attrs.put("gridfs.chunk.size", "768 KB");
+        runner.enqueue(testData, attrs);
+        runner.run();
+        runner.assertTransferCount(PutGridFS.REL_FAILURE, 0);
+        runner.assertTransferCount(PutGridFS.REL_DUPLICATE, 0);
+        runner.assertTransferCount(PutGridFS.REL_SUCCESS, 1);
+    }
+
+    private void testHashUniqueness(AllowableValue value) {
+        String hashAttr = "hash.value";
+        String fileName = "test_duplicates.txt";
+        String content  = "Hello, world";
+        String hash     = DigestUtils.md5Hex(content);
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put(CoreAttributes.FILENAME.key(), fileName);
+        attrs.put(hashAttr, hash);
+        testUniqueness(attrs, content, value);
+    }
+
+    private void testUniqueness(Map<String, String> attrs, String content, AllowableValue param) {
+        runner.setProperty(PutGridFS.ENFORCE_UNIQUENESS, param);
+        for (int x = 0; x < 5; x++) {
+            runner.enqueue(content, attrs);
+            runner.run();
+        }
+
+        runner.assertTransferCount(PutGridFS.REL_FAILURE, 0);
+        runner.assertTransferCount(PutGridFS.REL_DUPLICATE, 4);
+        runner.assertTransferCount(PutGridFS.REL_SUCCESS, 1);
+    }
+}
index c731fe4..6fdebf5 100644 (file)
@@ -40,8 +40,8 @@ import java.util.List;
 
 @Tags({"mongo", "mongodb", "service"})
 @CapabilityDescription(
-    "Provides a controller service that configures a connection to MongoDB and provides access to that connection to " +
-    "other Mongo-related components."
+        "Provides a controller service that configures a connection to MongoDB and provides access to that connection to " +
+                "other Mongo-related components."
 )
 public class MongoDBControllerService extends AbstractControllerService implements MongoDBClientService {
     private String uri;