NIFI-5920: Tagging an object in S3
authorStephen Goodman <sgoodman@eitccorp.com>
Sun, 30 Dec 2018 00:10:38 +0000 (19:10 -0500)
committerKoji Kawamura <ijokarumawak@apache.org>
Tue, 8 Jan 2019 03:12:13 +0000 (12:12 +0900)
Unit tests and functionality for tagging an object in S3.

Set FlowFile attributes directly from tags retrieved from S3.

Add guard clauses to ensure evaluated properties are not blank.

This closes #3239.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java [new file with mode: 0644]
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITTagS3Object.java [new file with mode: 0644]
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestTagS3Object.java [new file with mode: 0644]

diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
new file mode 100644 (file)
index 0000000..6c9d72a
--- /dev/null
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
+import com.amazonaws.services.s3.model.GetObjectTaggingResult;
+import com.amazonaws.services.s3.model.ObjectTagging;
+import com.amazonaws.services.s3.model.SetObjectTaggingRequest;
+import com.amazonaws.services.s3.model.Tag;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+
+@SupportsBatching
+@WritesAttributes({
+        @WritesAttribute(attribute = "s3.tag.___", description = "The tags associated with the S3 object will be " +
+                "written as part of the FlowFile attributes")})
+@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class})
+@Tags({"Amazon", "S3", "AWS", "Archive", "Tag"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Sets tags on a FlowFile within an Amazon S3 Bucket. " +
+        "If attempting to tag a file that does not exist, FlowFile is routed to success.")
+public class TagS3Object extends AbstractS3Processor {
+
+    public static final PropertyDescriptor TAG_KEY = new PropertyDescriptor.Builder()
+            .name("tag-key")
+            .displayName("Tag Key")
+            .description("The key of the tag that will be set on the S3 Object")
+            .addValidator(new StandardValidators.StringLengthValidator(1, 127))
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor TAG_VALUE = new PropertyDescriptor.Builder()
+            .name("tag-value")
+            .displayName("Tag Value")
+            .description("The value of the tag that will be set on the S3 Object")
+            .addValidator(new StandardValidators.StringLengthValidator(1, 255))
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor APPEND_TAG = new PropertyDescriptor.Builder()
+            .name("append-tag")
+            .displayName("Append Tag")
+            .description("If set to true, the tag will be appended to the existing set of tags on the S3 object. " +
+                    "Any existing tags with the same key as the new tag will be updated with the specified value. If " +
+                    "set to false, the existing tags will be removed and the new tag will be set on the S3 object.")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .required(true)
+            .defaultValue("true")
+            .build();
+
+    public static final PropertyDescriptor VERSION_ID = new PropertyDescriptor.Builder()
+            .name("version")
+            .displayName("Version ID")
+            .description("The Version of the Object to tag")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(false)
+            .build();
+
+    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
+            Arrays.asList(KEY, BUCKET, VERSION_ID, TAG_KEY, TAG_VALUE, APPEND_TAG, ACCESS_KEY, SECRET_KEY,
+                    CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, TIMEOUT, SSL_CONTEXT_SERVICE,
+                    ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT,
+                    PROXY_USERNAME, PROXY_PASSWORD));
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final long startNanos = System.nanoTime();
+
+        final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+        final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+        final String newTagKey = context.getProperty(TAG_KEY).evaluateAttributeExpressions(flowFile).getValue();
+        final String newTagVal = context.getProperty(TAG_VALUE).evaluateAttributeExpressions(flowFile).getValue();
+
+        if(StringUtils.isBlank(bucket)){
+            failFlowWithBlankEvaluatedProperty(session, flowFile, BUCKET);
+            return;
+        }
+
+        if(StringUtils.isBlank(key)){
+            failFlowWithBlankEvaluatedProperty(session, flowFile, KEY);
+            return;
+        }
+
+        if(StringUtils.isBlank(newTagKey)){
+            failFlowWithBlankEvaluatedProperty(session, flowFile, TAG_KEY);
+            return;
+        }
+
+        if(StringUtils.isBlank(newTagVal)){
+            failFlowWithBlankEvaluatedProperty(session, flowFile, TAG_VALUE);
+            return;
+        }
+
+        final String version = context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();
+
+        final AmazonS3 s3 = getClient();
+
+        SetObjectTaggingRequest r;
+        List<Tag> tags = new ArrayList<>();
+
+        try {
+            if(context.getProperty(APPEND_TAG).asBoolean()) {
+                final GetObjectTaggingRequest gr = new GetObjectTaggingRequest(bucket, key);
+                GetObjectTaggingResult res = s3.getObjectTagging(gr);
+
+                // preserve tags on S3 object, but filter out existing tag keys that match the one we're setting
+                tags = res.getTagSet().stream().filter(t -> !t.getKey().equals(newTagKey)).collect(Collectors.toList());
+            }
+
+            tags.add(new Tag(newTagKey, newTagVal));
+
+            if(StringUtils.isBlank(version)){
+                r = new SetObjectTaggingRequest(bucket, key, new ObjectTagging(tags));
+            } else{
+                r = new SetObjectTaggingRequest(bucket, key, version, new ObjectTagging(tags));
+            }
+            s3.setObjectTagging(r);
+        } catch (final AmazonServiceException ase) {
+            getLogger().error("Failed to tag S3 Object for {}; routing to failure", new Object[]{flowFile, ase});
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        flowFile = setTagAttributes(session, flowFile, tags);
+
+        session.transfer(flowFile, REL_SUCCESS);
+        final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+        getLogger().info("Successfully tagged S3 Object for {} in {} millis; routing to success", new Object[]{flowFile, transferMillis});
+    }
+
+    private void failFlowWithBlankEvaluatedProperty(ProcessSession session, FlowFile flowFile, PropertyDescriptor pd) {
+        getLogger().error("{} value is blank after attribute expression language evaluation", new Object[]{pd.getName()});
+        flowFile = session.penalize(flowFile);
+        session.transfer(flowFile, REL_FAILURE);
+    }
+
+    private FlowFile setTagAttributes(ProcessSession session, FlowFile flowFile, List<Tag> tags) {
+        flowFile = session.removeAllAttributes(flowFile, Pattern.compile("^s3\\.tag\\..*"));
+
+        final Map<String, String> tagAttrs = new HashMap<>();
+        tags.stream().forEach(t -> tagAttrs.put("s3.tag." + t.getKey(), t.getValue()));
+        flowFile = session.putAllAttributes(flowFile, tagAttrs);
+        return flowFile;
+    }
+}
index c628fb1..68dd72a 100644 (file)
@@ -15,6 +15,7 @@
 org.apache.nifi.processors.aws.s3.FetchS3Object\r
 org.apache.nifi.processors.aws.s3.PutS3Object\r
 org.apache.nifi.processors.aws.s3.DeleteS3Object\r
+org.apache.nifi.processors.aws.s3.TagS3Object\r
 org.apache.nifi.processors.aws.s3.ListS3\r
 org.apache.nifi.processors.aws.sns.PutSNS\r
 org.apache.nifi.processors.aws.sqs.GetSQS\r
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITTagS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITTagS3Object.java
new file mode 100644 (file)
index 0000000..b1b42c0
--- /dev/null
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
+import com.amazonaws.services.s3.model.GetObjectTaggingResult;
+import com.amazonaws.services.s3.model.Tag;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Provides integration level testing with actual AWS S3 resources for {@link TagS3Object} and requires additional
+ * configuration and resources to work.
+ */
+public class ITTagS3Object extends AbstractS3IT {
+
+    @Test
+    public void testSimpleTag() throws Exception {
+        String objectKey = "test-file";
+        String tagKey = "nifi-key";
+        String tagValue = "nifi-val";
+
+        // put file in s3
+        putTestFile(objectKey, getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+
+        // Set up processor
+        final TestRunner runner = TestRunners.newTestRunner(new TagS3Object());
+        runner.setProperty(TagS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(TagS3Object.REGION, REGION);
+        runner.setProperty(TagS3Object.BUCKET, BUCKET_NAME);
+        runner.setProperty(TagS3Object.TAG_KEY, tagKey);
+        runner.setProperty(TagS3Object.TAG_VALUE, tagValue);
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", objectKey);
+        runner.enqueue(new byte[0], attrs);
+
+        // tag file
+        runner.run(1);
+
+        // Verify processor succeeds
+        runner.assertAllFlowFilesTransferred(TagS3Object.REL_SUCCESS, 1);
+
+        // Verify tag exists on S3 object
+        GetObjectTaggingResult res = client.getObjectTagging(new GetObjectTaggingRequest(BUCKET_NAME, objectKey));
+        assertTrue("Expected tag not found on S3 object", res.getTagSet().contains(new Tag(tagKey, tagValue)));
+    }
+
+    @Test
+    public void testAppendTag() throws Exception {
+        String objectKey = "test-file";
+        String tagKey = "nifi-key";
+        String tagValue = "nifi-val";
+
+        Tag existingTag = new Tag("oldkey", "oldvalue");
+
+        // put file in s3
+        putFileWithObjectTag(objectKey, getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME), Arrays.asList(existingTag));
+
+        // Set up processor
+        final TestRunner runner = TestRunners.newTestRunner(new TagS3Object());
+        runner.setProperty(TagS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(TagS3Object.REGION, REGION);
+        runner.setProperty(TagS3Object.BUCKET, BUCKET_NAME);
+        runner.setProperty(TagS3Object.TAG_KEY, tagKey);
+        runner.setProperty(TagS3Object.TAG_VALUE, tagValue);
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", objectKey);
+        runner.enqueue(new byte[0], attrs);
+
+        // tag file
+        runner.run(1);
+
+        // Verify processor succeeds
+        runner.assertAllFlowFilesTransferred(TagS3Object.REL_SUCCESS, 1);
+
+        // Verify new tag and existing exist on S3 object
+        GetObjectTaggingResult res = client.getObjectTagging(new GetObjectTaggingRequest(BUCKET_NAME, objectKey));
+        assertTrue("Expected new tag not found on S3 object", res.getTagSet().contains(new Tag(tagKey, tagValue)));
+        assertTrue("Expected existing tag not found on S3 object", res.getTagSet().contains(existingTag));
+    }
+
+    @Test
+    public void testReplaceTags() throws Exception {
+        String objectKey = "test-file";
+        String tagKey = "nifi-key";
+        String tagValue = "nifi-val";
+
+        Tag existingTag = new Tag("s3.tag.oldkey", "oldvalue");
+
+        // put file in s3
+        putFileWithObjectTag(objectKey, getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME), Arrays.asList(existingTag));
+
+        // Set up processor
+        final TestRunner runner = TestRunners.newTestRunner(new TagS3Object());
+        runner.setProperty(TagS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(TagS3Object.REGION, REGION);
+        runner.setProperty(TagS3Object.BUCKET, BUCKET_NAME);
+        runner.setProperty(TagS3Object.TAG_KEY, tagKey);
+        runner.setProperty(TagS3Object.TAG_VALUE, tagValue);
+        runner.setProperty(TagS3Object.APPEND_TAG, "false");
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", objectKey);
+        attrs.put("s3.tag."+existingTag.getKey(), existingTag.getValue());
+        runner.enqueue(new byte[0], attrs);
+
+        // tag file
+        runner.run(1);
+
+        // Verify processor succeeds
+        runner.assertAllFlowFilesTransferred(TagS3Object.REL_SUCCESS, 1);
+
+        // Verify flowfile attributes match s3 tags
+        MockFlowFile flowFiles = runner.getFlowFilesForRelationship(TagS3Object.REL_SUCCESS).get(0);
+        flowFiles.assertAttributeNotExists(existingTag.getKey());
+        flowFiles.assertAttributeEquals("s3.tag."+tagKey, tagValue);
+
+        // Verify new tag exists on S3 object and prior tag removed
+        GetObjectTaggingResult res = client.getObjectTagging(new GetObjectTaggingRequest(BUCKET_NAME, objectKey));
+        assertTrue("Expected new tag not found on S3 object", res.getTagSet().contains(new Tag(tagKey, tagValue)));
+        assertFalse("Existing tag not replaced on S3 object", res.getTagSet().contains(existingTag));
+    }
+}
+
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestTagS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestTagS3Object.java
new file mode 100644 (file)
index 0000000..d337796
--- /dev/null
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.GetObjectTaggingResult;
+import com.amazonaws.services.s3.model.SetObjectTaggingRequest;
+import com.amazonaws.services.s3.model.Tag;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+
+public class TestTagS3Object {
+
+    private TestRunner runner = null;
+    private TagS3Object mockTagS3Object = null;
+    private AmazonS3Client actualS3Client = null;
+    private AmazonS3Client mockS3Client = null;
+
+    @Before
+    public void setUp() {
+        mockS3Client = Mockito.mock(AmazonS3Client.class);
+        mockTagS3Object = new TagS3Object() {
+            protected AmazonS3Client getClient() {
+                actualS3Client = client;
+                return mockS3Client;
+            }
+        };
+        runner = TestRunners.newTestRunner(mockTagS3Object);
+    }
+
+    @Test
+    public void testTagObjectSimple() throws IOException {
+        final String tagKey = "k";
+        final String tagVal = "v";
+        runner.setProperty(TagS3Object.REGION, "us-west-2");
+        runner.setProperty(TagS3Object.BUCKET, "test-bucket");
+        runner.setProperty(TagS3Object.TAG_KEY, tagKey);
+        runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
+        runner.setProperty(TagS3Object.APPEND_TAG, "false");
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "object-key");
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(TagS3Object.REL_SUCCESS, 1);
+        ArgumentCaptor<SetObjectTaggingRequest> captureRequest = ArgumentCaptor.forClass(SetObjectTaggingRequest.class);
+        Mockito.verify(mockS3Client, Mockito.times(1)).setObjectTagging(captureRequest.capture());
+        SetObjectTaggingRequest request = captureRequest.getValue();
+        assertEquals("test-bucket", request.getBucketName());
+        assertEquals("object-key", request.getKey());
+        assertNull("test-version", request.getVersionId());
+        assertTrue("Expected tag not found in request", request.getTagging().getTagSet().contains(new Tag(tagKey, tagVal)));
+
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
+        MockFlowFile ff0 = flowFiles.get(0);
+        ff0.assertAttributeEquals("s3.tag."+tagKey, tagVal);
+    }
+
+    @Test
+    public void testTagObjectVersion() throws IOException {
+        final String tagKey = "k";
+        final String tagVal = "v";
+        runner.setProperty(TagS3Object.REGION, "us-west-2");
+        runner.setProperty(TagS3Object.BUCKET, "test-bucket");
+        runner.setProperty(TagS3Object.VERSION_ID, "test-version");
+        runner.setProperty(TagS3Object.TAG_KEY, tagKey);
+        runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
+        runner.setProperty(TagS3Object.APPEND_TAG, "false");
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "object-key");
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(TagS3Object.REL_SUCCESS, 1);
+        ArgumentCaptor<SetObjectTaggingRequest> captureRequest = ArgumentCaptor.forClass(SetObjectTaggingRequest.class);
+        Mockito.verify(mockS3Client, Mockito.times(1)).setObjectTagging(captureRequest.capture());
+        SetObjectTaggingRequest request = captureRequest.getValue();
+        assertEquals("test-bucket", request.getBucketName());
+        assertEquals("object-key", request.getKey());
+        assertEquals("test-version", request.getVersionId());
+        assertTrue("Expected tag not found in request", request.getTagging().getTagSet().contains(new Tag(tagKey, tagVal)));
+    }
+
+    @Test
+    public void testTagObjectAppendToExistingTags() throws IOException {
+        //set up existing tags on S3 object
+        Tag currentTag = new Tag("ck", "cv");
+        mockGetExistingTags(currentTag);
+
+        final String tagKey = "nk";
+        final String tagVal = "nv";
+        runner.setProperty(TagS3Object.REGION, "us-west-2");
+        runner.setProperty(TagS3Object.BUCKET, "test-bucket");
+        runner.setProperty(TagS3Object.TAG_KEY, tagKey);
+        runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "object-key");
+        attrs.put("s3.tag."+currentTag.getKey(), currentTag.getValue());
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(TagS3Object.REL_SUCCESS, 1);
+        ArgumentCaptor<SetObjectTaggingRequest> captureRequest = ArgumentCaptor.forClass(SetObjectTaggingRequest.class);
+        Mockito.verify(mockS3Client, Mockito.times(1)).setObjectTagging(captureRequest.capture());
+        SetObjectTaggingRequest request = captureRequest.getValue();
+        assertEquals("test-bucket", request.getBucketName());
+        assertEquals("object-key", request.getKey());
+        assertTrue("New tag not found in request", request.getTagging().getTagSet().contains(new Tag(tagKey, tagVal)));
+        assertTrue("Existing tag not found in request", request.getTagging().getTagSet().contains(currentTag));
+
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
+        MockFlowFile ff0 = flowFiles.get(0);
+        ff0.assertAttributeEquals("s3.tag."+tagKey, tagVal);
+        ff0.assertAttributeEquals("s3.tag."+currentTag.getKey(), currentTag.getValue());
+    }
+
+    @Test
+    public void testTagObjectAppendUpdatesExistingTagValue() throws IOException {
+        //set up existing tags on S3 object
+        Tag currentTag1 = new Tag("ck", "cv");
+        Tag currentTag2 = new Tag("nk", "ov");
+        mockGetExistingTags(currentTag1, currentTag2);
+
+        final String tagKey = "nk";
+        final String tagVal = "nv";
+        runner.setProperty(TagS3Object.REGION, "us-west-2");
+        runner.setProperty(TagS3Object.BUCKET, "test-bucket");
+        runner.setProperty(TagS3Object.TAG_KEY, tagKey);
+        runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "object-key");
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(TagS3Object.REL_SUCCESS, 1);
+        ArgumentCaptor<SetObjectTaggingRequest> captureRequest = ArgumentCaptor.forClass(SetObjectTaggingRequest.class);
+        Mockito.verify(mockS3Client, Mockito.times(1)).setObjectTagging(captureRequest.capture());
+        SetObjectTaggingRequest request = captureRequest.getValue();
+        assertEquals("test-bucket", request.getBucketName());
+        assertEquals("object-key", request.getKey());
+        assertTrue("New tag not found in request", request.getTagging().getTagSet().contains(new Tag(tagKey, tagVal)));
+        assertTrue("Existing tag not found in request", request.getTagging().getTagSet().contains(currentTag1));
+        assertFalse("Existing tag should be excluded from request", request.getTagging().getTagSet().contains(currentTag2));
+    }
+
+    @Test
+    public void testTagObjectReplacesExistingTags() throws IOException {
+        //set up existing tags on S3 object
+        Tag currentTag = new Tag("ck", "cv");
+        mockGetExistingTags(currentTag);
+
+        final String tagKey = "nk";
+        final String tagVal = "nv";
+        runner.setProperty(TagS3Object.REGION, "us-west-2");
+        runner.setProperty(TagS3Object.BUCKET, "test-bucket");
+        runner.setProperty(TagS3Object.TAG_KEY, tagKey);
+        runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
+        runner.setProperty(TagS3Object.APPEND_TAG, "false");
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "object-key");
+        attrs.put("s3.tag."+currentTag.getKey(), currentTag.getValue());
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(TagS3Object.REL_SUCCESS, 1);
+        ArgumentCaptor<SetObjectTaggingRequest> captureRequest = ArgumentCaptor.forClass(SetObjectTaggingRequest.class);
+        Mockito.verify(mockS3Client, Mockito.times(1)).setObjectTagging(captureRequest.capture());
+        SetObjectTaggingRequest request = captureRequest.getValue();
+        assertEquals("test-bucket", request.getBucketName());
+        assertEquals("object-key", request.getKey());
+        assertTrue("New tag not found in request", request.getTagging().getTagSet().contains(new Tag(tagKey, tagVal)));
+        assertFalse("Existing tag should be excluded from request", request.getTagging().getTagSet().contains(currentTag));
+
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
+        MockFlowFile ff0 = flowFiles.get(0);
+        ff0.assertAttributeEquals("s3.tag."+tagKey, tagVal);
+        ff0.assertAttributeNotExists("s3.tag."+currentTag.getKey());
+    }
+
+    @Test
+    public void testTagObjectS3Exception() {
+        //set up existing tags on S3 object
+        Tag currentTag = new Tag("ck", "cv");
+        mockGetExistingTags(currentTag);
+
+        final String tagKey = "nk";
+        final String tagVal = "nv";
+        runner.setProperty(TagS3Object.REGION, "us-west-2");
+        runner.setProperty(TagS3Object.BUCKET, "test-bucket");
+        runner.setProperty(TagS3Object.TAG_KEY, tagKey);
+        runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "delete-key");
+        runner.enqueue(new byte[0], attrs);
+        Mockito.doThrow(new AmazonS3Exception("TagFailure")).when(mockS3Client).setObjectTagging(Mockito.any());
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_FAILURE, 1);
+        ArgumentCaptor<SetObjectTaggingRequest> captureRequest = ArgumentCaptor.forClass(SetObjectTaggingRequest.class);
+    }
+
+    @Test
+    public void testGetPropertyDescriptors() throws Exception {
+        TagS3Object processor = new TagS3Object();
+        List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
+        assertEquals("size should be eq", 20, pd.size());
+        assertTrue(pd.contains(TagS3Object.ACCESS_KEY));
+        assertTrue(pd.contains(TagS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
+        assertTrue(pd.contains(TagS3Object.BUCKET));
+        assertTrue(pd.contains(TagS3Object.CREDENTIALS_FILE));
+        assertTrue(pd.contains(TagS3Object.ENDPOINT_OVERRIDE));
+        assertTrue(pd.contains(TagS3Object.KEY));
+        assertTrue(pd.contains(TagS3Object.REGION));
+        assertTrue(pd.contains(TagS3Object.SECRET_KEY));
+        assertTrue(pd.contains(TagS3Object.SIGNER_OVERRIDE));
+        assertTrue(pd.contains(TagS3Object.SSL_CONTEXT_SERVICE));
+        assertTrue(pd.contains(TagS3Object.TIMEOUT));
+        assertTrue(pd.contains(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE));
+        assertTrue(pd.contains(TagS3Object.PROXY_HOST));
+        assertTrue(pd.contains(TagS3Object.PROXY_HOST_PORT));
+        assertTrue(pd.contains(TagS3Object.PROXY_USERNAME));
+        assertTrue(pd.contains(TagS3Object.PROXY_PASSWORD));
+        assertTrue(pd.contains(TagS3Object.TAG_KEY));
+        assertTrue(pd.contains(TagS3Object.TAG_VALUE));
+        assertTrue(pd.contains(TagS3Object.APPEND_TAG));
+        assertTrue(pd.contains(TagS3Object.VERSION_ID));
+    }
+
+    @Test
+    public void testBucketEvaluatedAsBlank() {
+        runner.setProperty(TagS3Object.REGION, "us-west-2");
+        runner.setProperty(TagS3Object.BUCKET, "${not.existant.attribute}");
+        runner.setProperty(TagS3Object.TAG_KEY, "key");
+        runner.setProperty(TagS3Object.TAG_VALUE, "val");
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "delete-key");
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testTagKeyEvaluatedAsBlank() {
+        runner.setProperty(TagS3Object.REGION, "us-west-2");
+        runner.setProperty(TagS3Object.BUCKET, "test-bucket");
+        runner.setProperty(TagS3Object.TAG_KEY, "${not.existant.attribute}");
+        runner.setProperty(TagS3Object.TAG_VALUE, "val");
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "delete-key");
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testTagValEvaluatedAsBlank() {
+        runner.setProperty(TagS3Object.REGION, "us-west-2");
+        runner.setProperty(TagS3Object.BUCKET, "test-bucket");
+        runner.setProperty(TagS3Object.TAG_KEY, "tagKey");
+        runner.setProperty(TagS3Object.TAG_VALUE, "${not.existant.attribute}");
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "delete-key");
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_FAILURE, 1);
+    }
+
+    private void mockGetExistingTags(Tag... currentTag) {
+        List<Tag> currentTags = new ArrayList<>(Arrays.asList(currentTag));
+        Mockito.when(mockS3Client.getObjectTagging(Mockito.anyObject())).thenReturn(new GetObjectTaggingResult(currentTags));
+    }
+}