FLUME-2799 Kafka Source - Add message offset to headers
authorFerenc Szabo <szaboferee@apache.org>
Thu, 22 Nov 2018 14:45:52 +0000 (15:45 +0100)
committerFerenc Szabo <szaboferee@apache.org>
Thu, 22 Nov 2018 14:45:52 +0000 (15:45 +0100)
It seems when solving https://issues.apache.org/jira/browse/FLUME-2799 ,
an oversight resulted in the message offset not being added to the header.

This change corrects this.

This closes #238

Reviewers: Ferenc Szabo, Peter Turcsanyi

(Jehan Bruggeman via Ferenc Szabo)

flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java

index ddffa87..20f7c7d 100644 (file)
@@ -269,6 +269,10 @@ public class KafkaSource extends AbstractPollableSource
           headers.put(KafkaSourceConstants.PARTITION_HEADER,
               String.valueOf(message.partition()));
         }
+        if (!headers.containsKey(OFFSET_HEADER)) {
+          headers.put(OFFSET_HEADER,
+              String.valueOf(message.offset()));
+        }
 
         if (kafkaKey != null) {
           headers.put(KafkaSourceConstants.KEY_HEADER, kafkaKey);
index 474a143..0e15e73 100644 (file)
@@ -53,6 +53,7 @@ public class KafkaSourceConstants {
   public static final String KEY_HEADER = "key";
   public static final String TIMESTAMP_HEADER = "timestamp";
   public static final String PARTITION_HEADER = "partition";
+  public static final String OFFSET_HEADER = "offset";
 
   public static final String SET_TOPIC_HEADER = "setTopicHeader";
   public static final boolean DEFAULT_SET_TOPIC_HEADER = true;