NIFI-5919: Addressed a race condition that can exist if adding FlowFiles to a FlowFil...
authorMark Payne <markap14@hotmail.com>
Fri, 28 Dec 2018 15:35:48 +0000 (10:35 -0500)
committerPierre Villard <pierre.villard.fr@gmail.com>
Thu, 3 Jan 2019 10:35:23 +0000 (11:35 +0100)
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>
This closes #3238.

nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java

index 84731f7..353af49 100644 (file)
@@ -769,8 +769,26 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
                 putAll(flowFiles);
             } else {
                 logger.debug("Received the following FlowFiles from Peer: {}. Will accept FlowFiles to the local partition", flowFiles);
-                localPartition.putAll(flowFiles);
+
+                // As explained in the putAllAndGetPartitions() method, we must ensure that we call adjustSize() before we
+                // put the FlowFiles on the queue. Otherwise, we will encounter a race condition. Specifically, that race condition
+                // can play out like so:
+                //
+                // Thread 1: Call localPartition.putAll() when the queue is empty (has a queue size of 0) but has not yet adjusted the size.
+                // Thread 2: Call poll() to obtain the FlowFile just received.
+                // Thread 2: Transfer the FlowFile to some Relationship
+                // Thread 2: Commit the session, which will call acknowledge on this queue.
+                // Thread 2: The acknowledge() method attempts to decrement the size of the queue to -1.
+                //           This causes an Exception to be thrown and the queue size to remain at 0.
+                //           However, the FlowFile has already been successfully transferred to the next Queue.
+                // Thread 1: Call adjustSize() to increment the size of the queue to 1 FlowFile.
+                //
+                // In this scenario, we now have no FlowFiles in the queue. However, the queue size is set to 1.
+                // We can avoid this race condition by simply ensuring that we call adjustSize() before making the FlowFiles
+                // available on the queue. This way, we cannot possibly obtain the FlowFiles and process/acknowledge them before the queue
+                // size has been updated to account for them and therefore we will not attempt to assign a negative queue size.
                 adjustSize(flowFiles.size(), flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum());
+                localPartition.putAll(flowFiles);
             }
         } finally {
             partitionReadLock.unlock();