FLUME-3314 Fixed NPE in Kafka source/channel during offset migration (#276)
authorturcsanyip <35004384+turcsanyip@users.noreply.github.com>
Thu, 7 Feb 2019 11:26:58 +0000 (12:26 +0100)
committerFerenc Szabó <szaboferee@gmail.com>
Thu, 7 Feb 2019 11:26:57 +0000 (12:26 +0100)
* FLUME-3314 Fixed NPE in Kafka source/channel during offset migration

Kafka source/channel threw NPE when migrateOffsets() was called
on nonexistent topics.
It has been fixed by adding null check and logging a warning message
(topic not found, skipping offset migration).

After skipping the offset migration, the source/channel works the same way
as the "non offset migration" case:
- starts and prints warning messages about the non-existing topic periodically
- can recover if the topic is created later

* FLUME-3314 Additional null checks.

Reviewers: Ferenc Szabo

(Peter Turcsanyi via Ferenc Szabo)

flume-ng-channels/flume-kafka-channel/pom.xml
flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java
flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
flume-ng-sources/flume-kafka-source/pom.xml
flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java

index 7819826..9bf1e27 100644 (file)
@@ -71,7 +71,11 @@ limitations under the License.
       <artifactId>mockito-all</artifactId>
       <scope>test</scope>
     </dependency>
-
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
 </project>
index 40494d4..852b4bd 100644 (file)
@@ -318,6 +318,10 @@ public class KafkaChannel extends BasicChannelSemantics {
             Time.SYSTEM, "kafka.server", "SessionExpireListener");
          KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps)) {
       Map<TopicPartition, OffsetAndMetadata> kafkaOffsets = getKafkaOffsets(consumer);
+      if (kafkaOffsets == null) {
+        logger.warn("Topic " + topicStr + " not found in Kafka. Offset migration will be skipped.");
+        return;
+      }
       if (!kafkaOffsets.isEmpty()) {
         logger.info("Found Kafka offsets for topic {}. Will not migrate from zookeeper", topicStr);
         logger.debug("Offsets found: {}", kafkaOffsets);
@@ -338,7 +342,8 @@ public class KafkaChannel extends BasicChannelSemantics {
       // Read the offsets to verify they were committed
       Map<TopicPartition, OffsetAndMetadata> newKafkaOffsets = getKafkaOffsets(consumer);
       logger.debug("Offsets committed: {}", newKafkaOffsets);
-      if (!newKafkaOffsets.keySet().containsAll(zookeeperOffsets.keySet())) {
+      if (newKafkaOffsets == null
+          || !newKafkaOffsets.keySet().containsAll(zookeeperOffsets.keySet())) {
         throw new FlumeException("Offsets could not be committed");
       }
     }
@@ -347,13 +352,16 @@ public class KafkaChannel extends BasicChannelSemantics {
 
   private Map<TopicPartition, OffsetAndMetadata> getKafkaOffsets(
       KafkaConsumer<String, byte[]> client) {
-    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+    Map<TopicPartition, OffsetAndMetadata> offsets = null;
     List<PartitionInfo> partitions = client.partitionsFor(topicStr);
-    for (PartitionInfo partition : partitions) {
-      TopicPartition key = new TopicPartition(topicStr, partition.partition());
-      OffsetAndMetadata offsetAndMetadata = client.committed(key);
-      if (offsetAndMetadata != null) {
-        offsets.put(key, offsetAndMetadata);
+    if (partitions != null) {
+      offsets = new HashMap<>();
+      for (PartitionInfo partition : partitions) {
+        TopicPartition key = new TopicPartition(topicStr, partition.partition());
+        OffsetAndMetadata offsetAndMetadata = client.committed(key);
+        if (offsetAndMetadata != null) {
+          offsets.put(key, offsetAndMetadata);
+        }
       }
     }
     return offsets;
index 2362c0d..7657aa6 100644 (file)
@@ -22,6 +22,7 @@ import kafka.zk.KafkaZkClient;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.Transaction;
+import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -190,4 +191,20 @@ public class TestOffsetsAndMigration extends TestKafkaChannelBase {
       Assert.assertTrue("Channel should read the 11th message", finals.contains(11));
     }
   }
+
+  @Test
+  public void testMigrateZookeeperOffsetsWhenTopicNotExists() throws Exception {
+    topic = findUnusedTopic();
+
+    Context context = prepareDefaultContext(false);
+    context.put(ZOOKEEPER_CONNECT_FLUME_KEY, testUtil.getZkUrl());
+    context.put(GROUP_ID_FLUME, "testMigrateOffsets-nonExistingTopic");
+    KafkaChannel channel = createChannel(context);
+
+    channel.start();
+
+    Assert.assertEquals(LifecycleState.START, channel.getLifecycleState());
+
+    channel.stop();
+  }
 }
index a2071fe..55fa20d 100644 (file)
@@ -1,5 +1,4 @@
 # Licensed to the Apache Software Foundation (ASF) under one or more
-# Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
 # The ASF licenses this file to You under the Apache License, Version 2.0
@@ -118,3 +117,5 @@ zookeeper.connect=localhost:2181
 zookeeper.connection.timeout.ms=1000000
 
 offsets.topic.replication.factor=1
+
+auto.create.topics.enable=false
index 2312247..b6e1207 100644 (file)
@@ -1,5 +1,4 @@
 # Licensed to the Apache Software Foundation (ASF) under one or more
-# Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
 # The ASF licenses this file to You under the Apache License, Version 2.0
index 0ec9d72..affc999 100644 (file)
       <classifier>test</classifier>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
index 20f7c7d..b02285d 100644 (file)
@@ -567,6 +567,10 @@ public class KafkaSource extends AbstractPollableSource
          KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(kafkaProps)) {
       Map<TopicPartition, OffsetAndMetadata> kafkaOffsets =
           getKafkaOffsets(consumer, topicStr);
+      if (kafkaOffsets == null) {
+        log.warn("Topic " + topicStr + " not found in Kafka. Offset migration will be skipped.");
+        return;
+      }
       if (!kafkaOffsets.isEmpty()) {
         log.info("Found Kafka offsets for topic " + topicStr +
             ". Will not migrate from zookeeper");
@@ -589,7 +593,8 @@ public class KafkaSource extends AbstractPollableSource
       Map<TopicPartition, OffsetAndMetadata> newKafkaOffsets =
           getKafkaOffsets(consumer, topicStr);
       log.debug("Offsets committed: {}", newKafkaOffsets);
-      if (!newKafkaOffsets.keySet().containsAll(zookeeperOffsets.keySet())) {
+      if (newKafkaOffsets == null
+          || !newKafkaOffsets.keySet().containsAll(zookeeperOffsets.keySet())) {
         throw new FlumeException("Offsets could not be committed");
       }
     }
@@ -597,13 +602,16 @@ public class KafkaSource extends AbstractPollableSource
 
   private Map<TopicPartition, OffsetAndMetadata> getKafkaOffsets(
       KafkaConsumer<String, byte[]> client, String topicStr) {
-    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+    Map<TopicPartition, OffsetAndMetadata> offsets = null;
     List<PartitionInfo> partitions = client.partitionsFor(topicStr);
-    for (PartitionInfo partition : partitions) {
-      TopicPartition key = new TopicPartition(topicStr, partition.partition());
-      OffsetAndMetadata offsetAndMetadata = client.committed(key);
-      if (offsetAndMetadata != null) {
-        offsets.put(key, offsetAndMetadata);
+    if (partitions != null) {
+      offsets = new HashMap<>();
+      for (PartitionInfo partition : partitions) {
+        TopicPartition key = new TopicPartition(topicStr, partition.partition());
+        OffsetAndMetadata offsetAndMetadata = client.committed(key);
+        if (offsetAndMetadata != null) {
+          offsets.put(key, offsetAndMetadata);
+        }
       }
     }
     return offsets;
index f4fe57d..56a582a 100644 (file)
@@ -76,6 +76,7 @@ public class KafkaSourceEmbeddedKafka {
     props.put("port", String.valueOf(serverPort));
     props.put("log.dir", dir.getAbsolutePath());
     props.put("offsets.topic.replication.factor", "1");
+    props.put("auto.create.topics.enable", "false");
     if (properties != null) {
       props.putAll(properties);
     }
index a82c972..d866c98 100644 (file)
@@ -34,6 +34,7 @@ import org.apache.flume.FlumeException;
 import org.apache.flume.PollableSource.Status;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.instrumentation.SourceCounter;
+import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -340,6 +341,8 @@ public class TestKafkaSource {
     startKafkaSource();
     Thread.sleep(500L);
 
+    assertEquals(LifecycleState.START, kafkaSource.getLifecycleState());
+
     Status status = kafkaSource.process();
     assertEquals(Status.BACKOFF, status);
   }
@@ -845,7 +848,7 @@ public class TestKafkaSource {
     kafkaSource.stop();
   }
 
-  public void doTestMigrateZookeeperOffsets(boolean hasZookeeperOffsets, boolean hasKafkaOffsets,
+  private void doTestMigrateZookeeperOffsets(boolean hasZookeeperOffsets, boolean hasKafkaOffsets,
                                             String group) throws Exception {
     // create a topic with 1 partition for simplicity
     String topic = findUnusedTopic();
@@ -928,6 +931,27 @@ public class TestKafkaSource {
     }
   }
 
+  @Test
+  public void testMigrateZookeeperOffsetsWhenTopicNotExists() throws Exception {
+    String topic = findUnusedTopic();
+
+    Context context = prepareDefaultContext("testMigrateOffsets-nonExistingTopic");
+    context.put(ZOOKEEPER_CONNECT_FLUME_KEY, kafkaServer.getZkConnectString());
+    context.put(TOPIC, topic);
+    KafkaSource source = new KafkaSource();
+    source.doConfigure(context);
+
+    source.setChannelProcessor(createGoodChannel());
+    source.start();
+
+    assertEquals(LifecycleState.START, source.getLifecycleState());
+
+    Status status = source.process();
+    assertEquals(Status.BACKOFF, status);
+
+    source.stop();
+  }
+
   ChannelProcessor createGoodChannel() {
 
     ChannelProcessor channelProcessor = mock(ChannelProcessor.class);