FLUME-3294 Fix polling logic in TaildirSource
authorPeter Turcsanyi <turcsanyi@cloudera.com>
Thu, 22 Nov 2018 20:51:28 +0000 (21:51 +0100)
committerFerenc Szabo <szaboferee@apache.org>
Thu, 22 Nov 2018 20:51:28 +0000 (21:51 +0100)
TaildirSource.process() implements the correct polling logic now. It returns
Status.READY / Status.BACKOFF which controls the common backoff sleeping
mechanism implemented in PollableSourceRunner.PollingRunner (instead of
always returning Status.READY and sleeping inside the method which was
an incorrect behaviour).

This closes #241

Reviewers: Endre Major, Denes Arvay

(Peter Turcsanyi via Ferenc Szabo)

flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java

index 15ba507..9ecccd7 100644 (file)
@@ -229,22 +229,20 @@ public class TaildirSource extends AbstractSource implements
 
   @Override
   public Status process() {
-    Status status = Status.READY;
+    Status status = Status.BACKOFF;
     try {
       existingInodes.clear();
       existingInodes.addAll(reader.updateTailFiles());
       for (long inode : existingInodes) {
         TailFile tf = reader.getTailFiles().get(inode);
         if (tf.needTail()) {
-          tailFileProcess(tf, true);
+          boolean hasMoreLines = tailFileProcess(tf, true);
+          if (hasMoreLines) {
+            status = Status.READY;
+          }
         }
       }
       closeTailFiles();
-      try {
-        TimeUnit.MILLISECONDS.sleep(retryInterval);
-      } catch (InterruptedException e) {
-        logger.info("Interrupted while sleeping");
-      }
     } catch (Throwable t) {
       logger.error("Unable to tail files", t);
       sourceCounter.incrementEventReadFail();
@@ -263,14 +261,14 @@ public class TaildirSource extends AbstractSource implements
     return maxBackOffSleepInterval;
   }
 
-  private void tailFileProcess(TailFile tf, boolean backoffWithoutNL)
+  private boolean tailFileProcess(TailFile tf, boolean backoffWithoutNL)
       throws IOException, InterruptedException {
     long batchCount = 0;
     while (true) {
       reader.setCurrentFile(tf);
       List<Event> events = reader.readEvents(batchSize, backoffWithoutNL);
       if (events.isEmpty()) {
-        break;
+        return false;
       }
       sourceCounter.addToEventReceivedCount(events.size());
       sourceCounter.incrementAppendBatchReceivedCount();
@@ -291,11 +289,11 @@ public class TaildirSource extends AbstractSource implements
       sourceCounter.incrementAppendBatchAcceptedCount();
       if (events.size() < batchSize) {
         logger.debug("The events taken from " + tf.getPath() + " is less than " + batchSize);
-        break;
+        return false;
       }
       if (++batchCount >= maxBatchCount) {
         logger.debug("The batches read from the same file is larger than " + maxBatchCount );
-        break;
+        return true;
       }
     }
   }
index 416e82a..1c30cd4 100644 (file)
@@ -29,6 +29,7 @@ import org.apache.flume.ChannelException;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
+import org.apache.flume.PollableSource.Status;
 import org.apache.flume.Transaction;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.channel.MemoryChannel;
@@ -437,4 +438,38 @@ public class TestTaildirSource {
     assertEquals(secondFile + "line4", new String(eventList.get(7).getBody()));
   }
 
+  @Test
+  public void testStatus() throws IOException {
+    File f1 = new File(tmpDir, "file1");
+    File f2 = new File(tmpDir, "file2");
+    Files.write("file1line1\nfile1line2\n" +
+        "file1line3\nfile1line4\nfile1line5\n", f1, Charsets.UTF_8);
+    Files.write("file2line1\nfile2line2\n" +
+        "file2line3\n", f2, Charsets.UTF_8);
+
+    Context context = new Context();
+    context.put(POSITION_FILE, posFilePath);
+    context.put(FILE_GROUPS, "fg");
+    context.put(FILE_GROUPS_PREFIX + "fg", tmpDir.getAbsolutePath() + "/file.*");
+    context.put(BATCH_SIZE, String.valueOf(1));
+    context.put(MAX_BATCH_COUNT, String.valueOf(2));
+
+    Configurables.configure(source, context);
+    source.start();
+
+    Status status;
+
+    status = source.process();
+    assertEquals(Status.READY, status);
+
+    status = source.process();
+    assertEquals(Status.READY, status);
+
+    status = source.process();
+    assertEquals(Status.BACKOFF, status);
+
+    status = source.process();
+    assertEquals(Status.BACKOFF, status);
+  }
+
 }