NIFI-5892 Wait timestamp lingers, potentially messing up downstream wait-notify pairs
authorOtto Fowler <ottobackwards@gmail.com>
Fri, 21 Dec 2018 14:15:24 +0000 (09:15 -0500)
committerKoji Kawamura <ijokarumawak@apache.org>
Fri, 28 Dec 2018 01:30:02 +0000 (10:30 +0900)
Clear the wait timestamp when transferring to failur or success

replace explicit attribute clear with function call, refactor and integrate into existing tests per review

This closes #3233.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java

index cfec15e..e297556 100644 (file)
@@ -84,7 +84,8 @@ import static org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJE
 )
 @WritesAttributes({
         @WritesAttribute(attribute = "wait.start.timestamp", description = "All FlowFiles will have an attribute 'wait.start.timestamp', which sets the "
-        + "initial epoch timestamp when the file first entered this processor.  This is used to determine the expiration time of the FlowFile."),
+        + "initial epoch timestamp when the file first entered this processor.  This is used to determine the expiration time of the FlowFile.  "
+        + "This attribute is not written when the FlowFile is transferred to failure or success"),
         @WritesAttribute(attribute = "wait.counter.<counterName>", description = "If a signal exists when the processor runs, "
         + "each count value in the signal is copied.")
 })
@@ -314,6 +315,8 @@ public class Wait extends AbstractProcessor {
 
         final Consumer<FlowFile> transferToFailure = flowFile -> {
             flowFile = session.penalize(flowFile);
+            // This flowFile is now failed, our tracking is done, clear the timer
+            flowFile = clearWaitState(session, flowFile);
             getFlowFilesFor.apply(REL_FAILURE).add(flowFile);
         };
 
@@ -328,9 +331,19 @@ public class Wait extends AbstractProcessor {
                     relationship = Relationship.SELF;
                 }
             }
-
+            final Relationship finalRelationship = relationship;
             final List<FlowFile> flowFilesWithSignalAttributes = routedFlowFiles.getValue().stream()
-                    .map(f -> copySignalAttributes(session, f, signalRef.get(), originalSignalCounts, replaceOriginalAttributes)).collect(Collectors.toList());
+                    .map(f -> {
+                        if (REL_SUCCESS.equals(finalRelationship)) {
+                            // These flowFiles will be exiting the wait, clear the timer
+                            f = clearWaitState(session, f);
+                        }
+                        return copySignalAttributes(session, f, signalRef.get(),
+                            originalSignalCounts,
+                            replaceOriginalAttributes);
+                    })
+                    .collect(Collectors.toList());
+
             session.transfer(flowFilesWithSignalAttributes, relationship);
         };
 
@@ -470,6 +483,10 @@ public class Wait extends AbstractProcessor {
 
     }
 
+    private FlowFile clearWaitState(final ProcessSession session, final FlowFile flowFile) {
+        return session.removeAttribute(flowFile, WAIT_START_TIMESTAMP);
+    }
+
     private FlowFile copySignalAttributes(final ProcessSession session, final FlowFile flowFile, final Signal signal, final Map<String, Long> originalCount, final boolean replaceOriginal) {
         if (signal == null) {
             return flowFile;
index 5b5b6fc..2ccb2fe 100644 (file)
@@ -69,6 +69,8 @@ public class TestWait {
 
         // no cache key attribute
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
+        // timestamp must be present
+        runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0).assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
         runner.clearTransferState();
     }
 
@@ -101,6 +103,7 @@ public class TestWait {
 
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+        ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
 
         runner.clearTransferState();
         runner.enqueue(ff);
@@ -126,7 +129,7 @@ public class TestWait {
 
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
-
+        ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
         runner.clearTransferState();
         runner.enqueue(ff);
 
@@ -164,6 +167,7 @@ public class TestWait {
         runner.run();
 
         runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1);
+        runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
         runner.clearTransferState();
     }
 
@@ -178,6 +182,7 @@ public class TestWait {
 
         runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1);
         runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists("wait.counter.total");
+        runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
         runner.clearTransferState();
     }
 
@@ -231,6 +236,8 @@ public class TestWait {
 
         final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
 
+        // timer cleared
+        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
         // show a new attribute was copied from the cache
         assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only"));
         // show that the original attributes are still there
@@ -272,6 +279,8 @@ public class TestWait {
 
         final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
 
+        // timer cleared
+        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
         // show a new attribute was copied from the cache
         assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only"));
         // show that the original attributes are still there
@@ -310,7 +319,7 @@ public class TestWait {
         runner.run();
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         MockFlowFile waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
-
+        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
         /*
          * 2nd iteration.
          */
@@ -324,6 +333,7 @@ public class TestWait {
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         // Still waiting since total count doesn't reach to 3.
         waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
 
         /*
          * 3rd iteration.
@@ -335,6 +345,7 @@ public class TestWait {
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         // Still waiting since total count doesn't reach to 3.
         waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
 
         /*
          * 4th iteration.
@@ -350,6 +361,9 @@ public class TestWait {
 
         final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
 
+        // wait timer cleared
+        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
+
         // show a new attribute was copied from the cache
         assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only"));
         // show that the original attributes are still there
@@ -391,6 +405,7 @@ public class TestWait {
         runner.run();
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         MockFlowFile waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
 
         /*
          * 2nd iteration.
@@ -405,6 +420,7 @@ public class TestWait {
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         // Still waiting since counter-B doesn't reach to 2.
         waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
 
         /*
          * 3rd iteration.
@@ -419,6 +435,7 @@ public class TestWait {
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         // Still waiting since total count doesn't reach to 3.
         waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
 
         /*
          * 4th iteration.
@@ -434,6 +451,8 @@ public class TestWait {
 
         final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
 
+        // wait timer cleared
+        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
         // show a new attribute was copied from the cache
         assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only"));
         // show that the original attributes are still there
@@ -486,6 +505,8 @@ public class TestWait {
         runner.run();
         runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
         MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+        // timer cleared
+        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
         outputFlowFile.assertAttributeEquals("wait.counter.counter", "2");
 
         // expect counter to be decremented to 0 and releasable count remains 1.
@@ -502,6 +523,8 @@ public class TestWait {
         runner.run();
         runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
         outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+        // timer cleared
+        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
         // All counters are consumed.
         outputFlowFile.assertAttributeEquals("wait.counter.counter", "0");