FLUME-3101 Add maxBatchCount config property to Taildir Source.
authorPeter Turcsanyi <turcsanyi@cloudera.com>
Thu, 22 Nov 2018 16:12:57 +0000 (17:12 +0100)
committerFerenc Szabo <szaboferee@apache.org>
Thu, 22 Nov 2018 16:12:57 +0000 (17:12 +0100)
If there are multiple files in the path(s) that need to be tailed and there
is a file written by high frequency, then Taildir can read the batchSize size
events from that file every time. This can lead to an endless loop and Taildir
will only read data from the busy file, while other files will not be
processed.
Another problem is that in this case TaildirSource will be unresponsive to
stop requests too.

This commit handles this situation by introducing a new config property called
maxBatchCount. It controls the number of batches being read consecutively
from the same file. After reading maxBatchCount rounds from a file, Taildir
will switch to another file / will have a break in the processing.

This change is based on hunshenshi's patch.

This closes #240

Reviewers: Ferenc Szabo, Endre Major

(Peter Turcsanyi via Ferenc Szabo)

flume-ng-doc/sphinx/FlumeUserGuide.rst
flume-ng-sources/flume-taildir-source/pom.xml
flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java

index c6d947a..01eb81d 100644 (file)
@@ -1375,6 +1375,10 @@ skipToEnd                           false                          Whether to sk
 idleTimeout                         120000                         Time (ms) to close inactive files. If the closed file is appended new lines to, this source will automatically re-open it.
 writePosInterval                    3000                           Interval time (ms) to write the last position of each file on the position file.
 batchSize                           100                            Max number of lines to read and send to the channel at a time. Using the default is usually fine.
+maxBatchCount                       Long.MAX_VALUE                 Controls the number of batches being read consecutively from the same file.
+                                                                   If the source is tailing multiple files and one of them is written at a fast rate,
+                                                                   it can prevent other files to be processed, because the busy file would be read in an endless loop.
+                                                                   In this case lower this value.
 backoffSleepIncrement               1000                           The increment for time delay before reattempting to poll for new data, when the last attempt did not find any new data.
 maxBackoffSleep                     5000                           The max time delay between each reattempt to poll for new data, when the last attempt did not find any new data.
 cachePatternMatching                true                           Listing directories and applying the filename regex pattern may be time consuming for directories
@@ -1401,6 +1405,7 @@ Example for agent named a1:
   a1.sources.r1.headers.f2.headerKey1 = value2
   a1.sources.r1.headers.f2.headerKey2 = value2-2
   a1.sources.r1.fileHeader = true
+  a1.sources.ri.maxBatchCount = 1000
 
 Twitter 1% firehose Source (experimental)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
index 9cc07a3..192aaeb 100644 (file)
@@ -32,7 +32,7 @@ limitations under the License.
 
   <properties>
     <!-- TODO fix spotbugs/pmd violations -->
-    <spotbugs.maxAllowedViolations>13</spotbugs.maxAllowedViolations>
+    <spotbugs.maxAllowedViolations>14</spotbugs.maxAllowedViolations>
     <pmd.maxAllowedViolations>3</pmd.maxAllowedViolations>
   </properties>
 
index e121a2b..15ba507 100644 (file)
@@ -87,6 +87,7 @@ public class TaildirSource extends AbstractSource implements
   private Long maxBackOffSleepInterval;
   private boolean fileHeader;
   private String fileHeaderKey;
+  private Long maxBatchCount;
 
   @Override
   public synchronized void start() {
@@ -185,6 +186,12 @@ public class TaildirSource extends AbstractSource implements
             DEFAULT_FILE_HEADER);
     fileHeaderKey = context.getString(FILENAME_HEADER_KEY,
             DEFAULT_FILENAME_HEADER_KEY);
+    maxBatchCount = context.getLong(MAX_BATCH_COUNT, DEFAULT_MAX_BATCH_COUNT);
+    if (maxBatchCount <= 0) {
+      maxBatchCount = DEFAULT_MAX_BATCH_COUNT;
+      logger.warn("Invalid maxBatchCount specified, initializing source "
+          + "default maxBatchCount of {}", maxBatchCount);
+    }
 
     if (sourceCounter == null) {
       sourceCounter = new SourceCounter(getName());
@@ -258,6 +265,7 @@ public class TaildirSource extends AbstractSource implements
 
   private void tailFileProcess(TailFile tf, boolean backoffWithoutNL)
       throws IOException, InterruptedException {
+    long batchCount = 0;
     while (true) {
       reader.setCurrentFile(tf);
       List<Event> events = reader.readEvents(batchSize, backoffWithoutNL);
@@ -282,6 +290,11 @@ public class TaildirSource extends AbstractSource implements
       sourceCounter.addToEventAcceptedCount(events.size());
       sourceCounter.incrementAppendBatchAcceptedCount();
       if (events.size() < batchSize) {
+        logger.debug("The events taken from " + tf.getPath() + " is less than " + batchSize);
+        break;
+      }
+      if (++batchCount >= maxBatchCount) {
+        logger.debug("The batches read from the same file is larger than " + maxBatchCount );
         break;
       }
     }
index f2347f3..c614e26 100644 (file)
@@ -63,4 +63,8 @@ public class TaildirSourceConfigurationConstants {
   /** Whether to include absolute path filename in a header. */
   public static final String FILENAME_HEADER = "fileHeader";
   public static final boolean DEFAULT_FILE_HEADER = false;
+
+  /** The max number of batch reads from a file in one loop */
+  public static final String MAX_BATCH_COUNT = "maxBatchCount";
+  public static final Long DEFAULT_MAX_BATCH_COUNT = Long.MAX_VALUE;
 }
index 6825cc5..416e82a 100644 (file)
@@ -55,6 +55,8 @@ import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstant
 import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.POSITION_FILE;
 import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILENAME_HEADER;
 import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILENAME_HEADER_KEY;
+import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.BATCH_SIZE;
+import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.MAX_BATCH_COUNT;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -380,4 +382,59 @@ public class TestTaildirSource {
     source.stop();
   }
 
+  @Test
+  public void testMaxBatchCount() throws IOException {
+    File f1 = new File(tmpDir, "file1");
+    File f2 = new File(tmpDir, "file2");
+    Files.write("file1line1\nfile1line2\n" +
+        "file1line3\nfile1line4\n", f1, Charsets.UTF_8);
+    Files.write("file2line1\nfile2line2\n" +
+        "file2line3\nfile2line4\n", f2, Charsets.UTF_8);
+
+    Context context = new Context();
+    context.put(POSITION_FILE, posFilePath);
+    context.put(FILE_GROUPS, "fg");
+    context.put(FILE_GROUPS_PREFIX + "fg", tmpDir.getAbsolutePath() + "/file.*");
+    context.put(BATCH_SIZE, String.valueOf(1));
+    context.put(MAX_BATCH_COUNT, String.valueOf(2));
+
+    Configurables.configure(source, context);
+    source.start();
+
+    // 2 x 4 lines will be processed in 2 rounds
+    source.process();
+    source.process();
+
+    List<Event> eventList = new ArrayList<Event>();
+    for (int i = 0; i < 8; i++) {
+      Transaction txn = channel.getTransaction();
+      txn.begin();
+      Event e = channel.take();
+      txn.commit();
+      txn.close();
+      if (e == null) {
+        break;
+      }
+      eventList.add(e);
+    }
+
+    assertEquals("1", context.getString(BATCH_SIZE));
+    assertEquals("2", context.getString(MAX_BATCH_COUNT));
+
+    assertEquals(8, eventList.size());
+
+    // the processing order of the files is not deterministic
+    String firstFile = new String(eventList.get(0).getBody()).substring(0, 5);
+    String secondFile = firstFile.equals("file1") ? "file2" : "file1";
+
+    assertEquals(firstFile + "line1", new String(eventList.get(0).getBody()));
+    assertEquals(firstFile + "line2", new String(eventList.get(1).getBody()));
+    assertEquals(secondFile + "line1", new String(eventList.get(2).getBody()));
+    assertEquals(secondFile + "line2", new String(eventList.get(3).getBody()));
+    assertEquals(firstFile + "line3", new String(eventList.get(4).getBody()));
+    assertEquals(firstFile + "line4", new String(eventList.get(5).getBody()));
+    assertEquals(secondFile + "line3", new String(eventList.get(6).getBody()));
+    assertEquals(secondFile + "line4", new String(eventList.get(7).getBody()));
+  }
+
 }