FLUME-3281 Update to Kafka 2.0
authorEndre Major <emajor@cloudera.com>
Tue, 20 Nov 2018 16:16:51 +0000 (17:16 +0100)
committerFerenc Szabo <szaboferee@apache.org>
Tue, 20 Nov 2018 16:16:51 +0000 (17:16 +0100)
This has been tested with unit tests. The main difference that caused the most
problems is the consumer.poll(Duration) change. This does not block even when
it fetches meta data whereas the previous poll(long timeout) blocked
indefinitely for meta data fetching.
This has resulted in many test timing issues. I tried to do minimal changes at
the tests, just enough to make them pass.

Kafka 2.0 requires a higher version for slf4j, I had to update it to 1.7.25.

Option migrateZookeeperOffsets is deprecated in this PR.
This will allow us to get rid of Kafka server libraries in Flume.

Compatibility testing.
Modified the TestUtil to be able to use external servers. This way I could test
against a variety of Kafka Server versions using the normal unit tests.
Channel tests using 2.0.1 client:
Kafka_2.11_0.9.0.0 Not compatible
Kafka_2.11_0.10.0.0 Not compatible
Kafka_2.11_0.10.1.0 passed with TestPartition timeouts
(rerunning the single test passes so it is a tes isolation issue)
Kafka_2.11_0.10.2.0 passed with TestPartition timeouts
(rerunning the single test passes so it is a tes isolation issue)
Kafka_2.11-0.11.0.3 - timeouts in TestPartitions when creating topics
Kafka_2.11-1.0.2 - passed
Kafka_2.11-1.1.1 - passed
Kafka_2.11-2.0.1 - passed

This closes #235

Reviewers: Tristan Stevens, Ferenc Szabo, Peter Turcsanyi

(Endre Major via Ferenc Szabo)

21 files changed:
LICENSE
flume-ng-channels/flume-kafka-channel/pom.xml
flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannelBase.java
flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java
flume-ng-channels/flume-kafka-channel/src/test/resources/kafka-server.properties
flume-ng-doc/sphinx/FlumeUserGuide.rst
flume-ng-sinks/flume-ng-kafka-sink/pom.xml
flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java [deleted file]
flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java
flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/kafka-server.properties
flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/testutil.properties [new file with mode: 0644]
flume-ng-sources/flume-kafka-source/pom.xml
flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
flume-shared/flume-shared-kafka-test/pom.xml
flume-shared/flume-shared-kafka/src/main/java/org/apache/flume/shared/kafka/KafkaSSLUtil.java
flume-shared/flume-shared-kafka/src/test/java/org/apache/flume/shared/kafka/KafkaSSLUtilTest.java
pom.xml

diff --git a/LICENSE b/LICENSE
index 4331d40..40cf470 100644 (file)
--- a/LICENSE
+++ b/LICENSE
@@ -240,7 +240,7 @@ For
  jetty-<version>.jar:
  jetty-util-<version>.jar:
  joda-time-<version>.jar:
- kafka_2.10-<version>.jar:
+ kafka_2.11-<version>.jar:
  kafka-clients-<version>.jar:
  kite-data-core-<version>.jar:
  kite-data-hbase-<version>.jar:
index be5bf74..b9fb0d1 100644 (file)
@@ -54,7 +54,7 @@ limitations under the License.
     </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_2.10</artifactId>
+      <artifactId>kafka_2.11</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
index d2ea7ae..694cf3f 100644 (file)
@@ -20,8 +20,7 @@ package org.apache.flume.channel.kafka;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
-import kafka.utils.ZKGroupTopicDirs;
-import kafka.utils.ZkUtils;
+import kafka.zk.KafkaZkClient;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DecoderFactory;
@@ -55,6 +54,7 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
@@ -63,6 +63,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -78,7 +79,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
-import static scala.collection.JavaConverters.asJavaListConverter;
 
 public class KafkaChannel extends BasicChannelSemantics {
 
@@ -102,6 +102,7 @@ public class KafkaChannel extends BasicChannelSemantics {
   private String groupId = DEFAULT_GROUP_ID;
   private String partitionHeader = null;
   private Integer staticPartitionId;
+  @Deprecated
   private boolean migrateZookeeperOffsets = DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS;
 
   // used to indicate if a rebalance has occurred during the current transaction
@@ -312,10 +313,10 @@ public class KafkaChannel extends BasicChannelSemantics {
   }
 
   private void migrateOffsets() {
-    ZkUtils zkUtils = ZkUtils.apply(zookeeperConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT,
-        JaasUtils.isZkSecurityEnabled());
-    KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);
-    try {
+    try (KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperConnect,
+            JaasUtils.isZkSecurityEnabled(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, 10,
+            Time.SYSTEM, "kafka.server", "SessionExpireListener");
+         KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps)) {
       Map<TopicPartition, OffsetAndMetadata> kafkaOffsets = getKafkaOffsets(consumer);
       if (!kafkaOffsets.isEmpty()) {
         logger.info("Found Kafka offsets for topic {}. Will not migrate from zookeeper", topicStr);
@@ -324,7 +325,8 @@ public class KafkaChannel extends BasicChannelSemantics {
       }
 
       logger.info("No Kafka offsets found. Migrating zookeeper offsets");
-      Map<TopicPartition, OffsetAndMetadata> zookeeperOffsets = getZookeeperOffsets(zkUtils);
+      Map<TopicPartition, OffsetAndMetadata> zookeeperOffsets =
+              getZookeeperOffsets(zkClient, consumer);
       if (zookeeperOffsets.isEmpty()) {
         logger.warn("No offsets to migrate found in Zookeeper");
         return;
@@ -339,12 +341,10 @@ public class KafkaChannel extends BasicChannelSemantics {
       if (!newKafkaOffsets.keySet().containsAll(zookeeperOffsets.keySet())) {
         throw new FlumeException("Offsets could not be committed");
       }
-    } finally {
-      zkUtils.close();
-      consumer.close();
     }
   }
 
+
   private Map<TopicPartition, OffsetAndMetadata> getKafkaOffsets(
       KafkaConsumer<String, byte[]> client) {
     Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
@@ -359,18 +359,17 @@ public class KafkaChannel extends BasicChannelSemantics {
     return offsets;
   }
 
-  private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(ZkUtils client) {
+  private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(
+          KafkaZkClient zkClient, KafkaConsumer<String, byte[]> consumer) {
     Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
-    ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr);
-    List<String> partitions = asJavaListConverter(
-        client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava();
-    for (String partition : partitions) {
-      TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition));
-      Option<String> data = client.readDataMaybeNull(
-          topicDirs.consumerOffsetDir() + "/" + partition)._1();
-      if (data.isDefined()) {
-        Long offset = Long.valueOf(data.get());
-        offsets.put(key, new OffsetAndMetadata(offset));
+    List<PartitionInfo> partitions = consumer.partitionsFor(topicStr);
+    for (PartitionInfo partition : partitions) {
+      TopicPartition topicPartition = new TopicPartition(topicStr, partition.partition());
+      Option<Object> optionOffset = zkClient.getConsumerOffset(groupId, topicPartition);
+      if (optionOffset.nonEmpty()) {
+        Long offset = (Long) optionOffset.get();
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset);
+        offsets.put(topicPartition, offsetAndMetadata);
       }
     }
     return offsets;
@@ -690,7 +689,7 @@ public class KafkaChannel extends BasicChannelSemantics {
     private void poll() {
       logger.trace("Polling with timeout: {}ms channel-{}", pollTimeout, getName());
       try {
-        records = consumer.poll(pollTimeout);
+        records = consumer.poll(Duration.ofMillis(pollTimeout));
         recordIterator = records.iterator();
         logger.debug("{} returned {} records from last poll", getName(), records.count());
       } catch (WakeupException e) {
index 9f139d6..e1279a3 100644 (file)
@@ -19,8 +19,6 @@
 package org.apache.flume.channel.kafka;
 
 import com.google.common.collect.Lists;
-import kafka.admin.AdminUtils;
-import kafka.utils.ZkUtils;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -39,7 +37,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CyclicBarrier;
@@ -48,6 +45,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KAFKA_CONSUMER_PREFIX;
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT;
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.TOPIC_CONFIG;
 
@@ -91,21 +89,11 @@ public class TestKafkaChannelBase {
   }
 
   static void createTopic(String topicName, int numPartitions) {
-    int sessionTimeoutMs = 10000;
-    int connectionTimeoutMs = 10000;
-    ZkUtils zkUtils =
-        ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
-    int replicationFactor = 1;
-    Properties topicConfig = new Properties();
-    AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig);
+    testUtil.createTopics(Collections.singletonList(topicName), numPartitions);
   }
 
   static void deleteTopic(String topicName) {
-    int sessionTimeoutMs = 10000;
-    int connectionTimeoutMs = 10000;
-    ZkUtils zkUtils =
-        ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
-    AdminUtils.deleteTopic(zkUtils, topicName);
+    testUtil.deleteTopic(topicName);
   }
 
   KafkaChannel startChannel(boolean parseAsFlume) throws Exception {
@@ -121,6 +109,7 @@ public class TestKafkaChannelBase {
     context.put(BOOTSTRAP_SERVERS_CONFIG, testUtil.getKafkaServerUrl());
     context.put(PARSE_AS_FLUME_EVENT, String.valueOf(parseAsFlume));
     context.put(TOPIC_CONFIG, topic);
+    context.put(KAFKA_CONSUMER_PREFIX + "max.poll.interval.ms", "10000");
 
     return context;
   }
index 47c583a..2362c0d 100644 (file)
@@ -18,8 +18,7 @@
  */
 package org.apache.flume.channel.kafka;
 
-import kafka.utils.ZKGroupTopicDirs;
-import kafka.utils.ZkUtils;
+import kafka.zk.KafkaZkClient;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.Transaction;
@@ -30,6 +29,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.utils.Time;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -97,7 +97,7 @@ public class TestOffsetsAndMigration extends TestKafkaChannelBase {
   }
 
   private Event takeEventWithoutCommittingTxn(KafkaChannel channel) {
-    for (int i = 0; i < 5; i++) {
+    for (int i = 0; i < 10; i++) {
       Transaction txn = channel.getTransaction();
       txn.begin();
 
@@ -142,14 +142,13 @@ public class TestOffsetsAndMigration extends TestKafkaChannelBase {
 
     // Commit 10th offset to zookeeper
     if (hasZookeeperOffsets) {
-      ZkUtils zkUtils = ZkUtils.apply(testUtil.getZkUrl(), 30000, 30000,
-          JaasUtils.isZkSecurityEnabled());
-      ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(group, topic);
-      // we commit the tenth offset to ensure some data is missed.
+      KafkaZkClient zkClient = KafkaZkClient.apply(testUtil.getZkUrl(),
+              JaasUtils.isZkSecurityEnabled(), 30000, 30000, 10, Time.SYSTEM,
+              "kafka.server", "SessionExpireListener");
+      zkClient.getConsumerOffset(group, new TopicPartition(topic, 0));
       Long offset = tenthOffset + 1;
-      zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir() + "/0", offset.toString(),
-          zkUtils.updatePersistentPath$default$3());
-      zkUtils.close();
+      zkClient.setOrCreateConsumerOffset(group, new TopicPartition(topic, 0), offset);
+      zkClient.close();
     }
 
     // Commit 5th offset to kafka
index a983089..c6d947a 100644 (file)
@@ -1448,7 +1448,7 @@ Kafka Source
 
 Kafka Source is an Apache Kafka consumer that reads messages from Kafka topics.
 If you have multiple Kafka sources running, you can configure them with the same Consumer Group
-so each will read a unique set of partitions for the topics.
+so each will read a unique set of partitions for the topics. This currently supports Kafka server releases 0.10.1.0 or higher. Testing was done up to 2.0.1 that was the highest avilable version at the time of the release.
 
 ==================================  ===========  ===================================================
 Property Name                       Default      Description
@@ -1481,12 +1481,6 @@ topicHeader                         topic        Defines the name of the header
                                                  from, if the ``setTopicHeader`` property is set to ``true``. Care should be taken if combining
                                                  with the Kafka Sink ``topicHeader`` property so as to avoid sending the message back to the same
                                                  topic in a loop.
-migrateZookeeperOffsets             true         When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka.
-                                                 This should be true to support seamless Kafka client migration from older versions of Flume.
-                                                 Once migrated this can be set to false, though that should generally not be required.
-                                                 If no Zookeeper offset is found, the Kafka configuration kafka.consumer.auto.offset.reset
-                                                 defines how offsets are handled.
-                                                 Check `Kafka documentation <http://kafka.apache.org/documentation.html#newconsumerconfigs>`_ for details
 kafka.consumer.security.protocol    PLAINTEXT    Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup.
 *more consumer security props*                   If using SASL_PLAINTEXT, SASL_SSL or SSL refer to `Kafka security <http://kafka.apache.org/documentation.html#security>`_ for additional
                                                  properties that need to be set on consumer.
@@ -1504,14 +1498,21 @@ Other Kafka Consumer Properties     --           These properties are used to co
 
 Deprecated Properties
 
-===============================  ===================  =============================================================================================
+===============================  ===================  ================================================================================================
 Property Name                    Default              Description
-===============================  ===================  =============================================================================================
+===============================  ===================  ================================================================================================
 topic                            --                   Use kafka.topics
 groupId                          flume                Use kafka.consumer.group.id
 zookeeperConnect                 --                   Is no longer supported by kafka consumer client since 0.9.x. Use kafka.bootstrap.servers
                                                       to establish connection with kafka cluster
-===============================  ===================  =============================================================================================
+migrateZookeeperOffsets          true                 When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka.
+                                                      This should be true to support seamless Kafka client migration from older versions of Flume.
+                                                      Once migrated this can be set to false, though that should generally not be required.
+                                                      If no Zookeeper offset is found, the Kafka configuration kafka.consumer.auto.offset.reset
+                                                      defines how offsets are handled.
+                                                      Check `Kafka documentation <http://kafka.apache.org/documentation.html#newconsumerconfigs>`_
+                                                      for details
+===============================  ===================  ================================================================================================
 
 Example for topic subscription by comma-separated topic list.
 
@@ -3131,9 +3132,9 @@ Kafka Sink
 This is a Flume Sink implementation that can publish data to a
 `Kafka <http://kafka.apache.org/>`_ topic. One of the objective is to integrate Flume
 with Kafka so that pull based processing systems can process the data coming
-through various Flume sources. This currently supports Kafka 0.9.x series of releases.
+through various Flume sources.
 
-This version of Flume no longer supports Older Versions (0.8.x) of Kafka.
+This currently supports Kafka server releases 0.10.1.0 or higher. Testing was done up to 2.0.1 that was the highest avilable version at the time of the release.
 
 Required properties are marked in bold font.
 
@@ -3538,9 +3539,7 @@ The Kafka channel can be used for multiple scenarios:
 #. With Flume source and interceptor but no sink - it allows writing Flume events into a Kafka topic, for use by other apps
 #. With Flume sink, but no source - it is a low-latency, fault tolerant way to send events from Kafka to Flume sinks such as HDFS, HBase or Solr
 
-
-This version of Flume requires Kafka version 0.9 or greater due to the reliance on the Kafka clients shipped with that version. The configuration of
-the channel has changed compared to previous flume versions.
+This currently supports Kafka server releases 0.10.1.0 or higher. Testing was done up to 2.0.1 that was the highest avilable version at the time of the release.
 
 The configuration parameters are organized as such:
 
@@ -3570,10 +3569,6 @@ parseAsFlumeEvent                        true                        Expecting A
                                                                      This should be true if Flume source is writing to the channel and false if other producers are
                                                                      writing into the topic that the channel is using. Flume source messages to Kafka can be parsed outside of Flume by using
                                                                      org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk artifact
-migrateZookeeperOffsets                  true                        When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka.
-                                                                     This should be true to support seamless Kafka client migration from older versions of Flume. Once migrated this can be set
-                                                                     to false, though that should generally not be required. If no Zookeeper offset is found the kafka.consumer.auto.offset.reset
-                                                                     configuration defines how offsets are handled.
 pollTimeout                              500                         The amount of time(in milliseconds) to wait in the "poll()" call of the consumer.
                                                                      https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)
 defaultPartitionId                       --                          Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless
@@ -3598,17 +3593,20 @@ kafka.consumer.security.protocol         PLAINTEXT                   Same as kaf
 
 Deprecated Properties
 
-================================  ==========================  ===============================================================================================================
+================================  ==========================  ============================================================================================================================
 Property Name                     Default                     Description
-================================  ==========================  ===============================================================================================================
+================================  ==========================  ============================================================================================================================
 brokerList                        --                          List of brokers in the Kafka cluster used by the channel
                                                               This can be a partial list of brokers, but we recommend at least two for HA.
                                                               The format is comma separated list of hostname:port
 topic                             flume-channel               Use kafka.topic
 groupId                           flume                       Use kafka.consumer.group.id
 readSmallestOffset                false                       Use kafka.consumer.auto.offset.reset
-
-================================  ==========================  ===============================================================================================================
+migrateZookeeperOffsets           true                        When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka.
+                                                              This should be true to support seamless Kafka client migration from older versions of Flume. Once migrated this can be set
+                                                              to false, though that should generally not be required. If no Zookeeper offset is found the kafka.consumer.auto.offset.reset
+                                                              configuration defines how offsets are handled.
+================================  ==========================  ============================================================================================================================
 
 .. note:: Due to the way the channel is load balanced, there may be duplicate events when the agent first starts up
 
index f9f10ae..eb65500 100644 (file)
@@ -84,7 +84,7 @@
 
     <dependency>
       <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_2.10</artifactId>
+      <artifactId>kafka_2.11</artifactId>
       <scope>test</scope>
     </dependency>
     <dependency>
index 92151cb..5a94c82 100644 (file)
 package org.apache.flume.sink.kafka;
 
 import com.google.common.base.Charsets;
-
-import kafka.admin.AdminUtils;
-import kafka.message.MessageAndMetadata;
-import kafka.utils.ZkUtils;
-
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
@@ -45,6 +40,8 @@ import org.apache.flume.shared.kafka.test.PartitionTestScenario;
 import org.apache.flume.sink.kafka.util.TestUtil;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -54,8 +51,8 @@ import org.mockito.internal.util.reflection.Whitebox;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -75,7 +72,8 @@ import static org.apache.flume.sink.kafka.KafkaSinkConstants.OLD_BATCH_SIZE;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY;
 import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_CONFIG;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -83,7 +81,7 @@ import static org.junit.Assert.fail;
  */
 public class TestKafkaSink {
 
-  private static TestUtil testUtil = TestUtil.getInstance();
+  private static final TestUtil testUtil = TestUtil.getInstance();
   private final Set<String> usedTopics = new HashSet<String>();
 
   @BeforeClass
@@ -178,9 +176,15 @@ public class TestKafkaSink {
       // ignore
     }
 
-    String fetchedMsg = new String((byte[]) testUtil.getNextMessageFromConsumer(DEFAULT_TOPIC)
-                                                    .message());
-    assertEquals(msg, fetchedMsg);
+    checkMessageArrived(msg, DEFAULT_TOPIC);
+  }
+
+  private void checkMessageArrived(String msg, String topic) {
+    ConsumerRecords recs = pollConsumerRecords(topic);
+    assertNotNull(recs);
+    assertTrue(recs.count() > 0);
+    ConsumerRecord consumerRecord = (ConsumerRecord) recs.iterator().next();
+    assertEquals(msg, consumerRecord.value());
   }
 
   @Test
@@ -199,13 +203,11 @@ public class TestKafkaSink {
       // ignore
     }
 
-    String fetchedMsg = new String((byte[]) testUtil.getNextMessageFromConsumer(
-        TestConstants.STATIC_TOPIC).message());
-    assertEquals(msg, fetchedMsg);
+    checkMessageArrived(msg, TestConstants.STATIC_TOPIC);
   }
 
   @Test
-  public void testTopicAndKeyFromHeader() throws UnsupportedEncodingException {
+  public void testTopicAndKeyFromHeader() {
     Sink kafkaSink = new KafkaSink();
     Context context = prepareDefaultContext();
     Configurables.configure(kafkaSink, context);
@@ -234,21 +236,15 @@ public class TestKafkaSink {
       // ignore
     }
 
-    MessageAndMetadata fetchedMsg =
-        testUtil.getNextMessageFromConsumer(TestConstants.CUSTOM_TOPIC);
-
-    assertEquals(msg, new String((byte[]) fetchedMsg.message(), "UTF-8"));
-    assertEquals(TestConstants.CUSTOM_KEY,
-                 new String((byte[]) fetchedMsg.key(), "UTF-8"));
+    checkMessageArrived(msg, TestConstants.CUSTOM_TOPIC);
   }
 
   /**
    * Tests that a message will be produced to a topic as specified by a
    * custom topicHeader parameter (FLUME-3046).
-   * @throws UnsupportedEncodingException
    */
   @Test
-  public void testTopicFromConfHeader() throws UnsupportedEncodingException {
+  public void testTopicFromConfHeader() {
     String customTopicHeader = "customTopicHeader";
     Sink kafkaSink = new KafkaSink();
     Context context = prepareDefaultContext();
@@ -278,19 +274,15 @@ public class TestKafkaSink {
       // ignore
     }
 
-    MessageAndMetadata<?, ?> fetchedMsg =
-        testUtil.getNextMessageFromConsumer(TestConstants.CUSTOM_TOPIC);
-
-    assertEquals(msg, new String((byte[]) fetchedMsg.message(), "UTF-8"));
+    checkMessageArrived(msg, TestConstants.CUSTOM_TOPIC);
   }
 
   /**
    * Tests that the topicHeader parameter will be ignored if the allowTopicHeader
    * parameter is set to false (FLUME-3046).
-   * @throws UnsupportedEncodingException
    */
   @Test
-  public void testTopicNotFromConfHeader() throws UnsupportedEncodingException {
+  public void testTopicNotFromConfHeader() {
     Sink kafkaSink = new KafkaSink();
     Context context = prepareDefaultContext();
     context.put(KafkaSinkConstants.ALLOW_TOPIC_OVERRIDE_HEADER, "false");
@@ -322,14 +314,12 @@ public class TestKafkaSink {
       // ignore
     }
 
-    MessageAndMetadata<?, ?> fetchedMsg =
-        testUtil.getNextMessageFromConsumer(DEFAULT_TOPIC);
-
-    assertEquals(msg, new String((byte[]) fetchedMsg.message(), "UTF-8"));
+    checkMessageArrived(msg, DEFAULT_TOPIC);
   }
 
   @Test
-  public void testReplaceSubStringOfTopicWithHeaders() throws UnsupportedEncodingException {
+  public void testReplaceSubStringOfTopicWithHeaders() {
+    String topic = TestConstants.HEADER_1_VALUE + "-topic";
     Sink kafkaSink = new KafkaSink();
     Context context = prepareDefaultContext();
     context.put(TOPIC_CONFIG, TestConstants.HEADER_TOPIC);
@@ -358,10 +348,7 @@ public class TestKafkaSink {
       // ignore
     }
 
-    String fetchedMsg = new String((byte[])
-        testUtil.getNextMessageFromConsumer(TestConstants.HEADER_1_VALUE + "-topic").message());
-
-    assertEquals(msg, fetchedMsg);
+    checkMessageArrived(msg, topic);
   }
 
   @SuppressWarnings("rawtypes")
@@ -398,12 +385,15 @@ public class TestKafkaSink {
       // ignore
     }
 
-    MessageAndMetadata fetchedMsg = testUtil.getNextMessageFromConsumer(TestConstants.CUSTOM_TOPIC);
+    String topic = TestConstants.CUSTOM_TOPIC;
 
-    ByteArrayInputStream in = new ByteArrayInputStream((byte[]) fetchedMsg.message());
+    ConsumerRecords<String, String> recs = pollConsumerRecords(topic);
+    assertNotNull(recs);
+    assertTrue(recs.count() > 0);
+    ConsumerRecord<String, String> consumerRecord = recs.iterator().next();
+    ByteArrayInputStream in = new ByteArrayInputStream(consumerRecord.value().getBytes());
     BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
-    SpecificDatumReader<AvroFlumeEvent> reader =
-        new SpecificDatumReader<AvroFlumeEvent>(AvroFlumeEvent.class);
+    SpecificDatumReader<AvroFlumeEvent> reader = new SpecificDatumReader<>(AvroFlumeEvent.class);
 
     AvroFlumeEvent avroevent = reader.read(null, decoder);
 
@@ -411,15 +401,33 @@ public class TestKafkaSink {
     Map<CharSequence, CharSequence> eventHeaders = avroevent.getHeaders();
 
     assertEquals(msg, eventBody);
-    assertEquals(TestConstants.CUSTOM_KEY, new String((byte[]) fetchedMsg.key(), "UTF-8"));
+    assertEquals(TestConstants.CUSTOM_KEY, consumerRecord.key());
 
     assertEquals(TestConstants.HEADER_1_VALUE,
                  eventHeaders.get(new Utf8(TestConstants.HEADER_1_KEY)).toString());
     assertEquals(TestConstants.CUSTOM_KEY, eventHeaders.get(new Utf8("key")).toString());
   }
 
+  private ConsumerRecords<String, String> pollConsumerRecords(String topic) {
+    return pollConsumerRecords(topic, 20);
+  }
+
+  private ConsumerRecords<String, String> pollConsumerRecords(String topic, int maxIter) {
+    ConsumerRecords<String, String> recs = null;
+    for (int i = 0; i < maxIter; i++) {
+      recs = testUtil.getNextMessageFromConsumer(topic);
+      if (recs.count() > 0) break;
+      try {
+        Thread.sleep(1000L);
+      } catch (InterruptedException e) {
+        //
+      }
+    }
+    return recs;
+  }
+
   @Test
-  public void testEmptyChannel() throws UnsupportedEncodingException, EventDeliveryException {
+  public void testEmptyChannel() throws EventDeliveryException {
     Sink kafkaSink = new KafkaSink();
     Context context = prepareDefaultContext();
     Configurables.configure(kafkaSink, context);
@@ -432,7 +440,9 @@ public class TestKafkaSink {
     if (status != Sink.Status.BACKOFF) {
       fail("Error Occurred");
     }
-    assertNull(testUtil.getNextMessageFromConsumer(DEFAULT_TOPIC));
+    ConsumerRecords recs = pollConsumerRecords(DEFAULT_TOPIC, 2);
+    assertNotNull(recs);
+    assertEquals(recs.count(), 0);
   }
 
   @Test
@@ -481,10 +491,9 @@ public class TestKafkaSink {
   /**
    * Tests that sub-properties (kafka.producer.*) apply correctly across multiple invocations
    * of configure() (fix for FLUME-2857).
-   * @throws Exception
    */
   @Test
-  public void testDefaultSettingsOnReConfigure() throws Exception {
+  public void testDefaultSettingsOnReConfigure() {
     String sampleProducerProp = "compression.type";
     String sampleProducerVal = "snappy";
 
@@ -515,10 +524,8 @@ public class TestKafkaSink {
    *    Expected behaviour: Exception is not thrown because the code avoids an NPE.
    *
    * 3. PartitionOption.NOTANUMBER: The partition header is set, but is not an Integer.
-   *    Expected behaviour: ChannelExeption thrown.
+   *    Expected behaviour: ChannelException thrown.
    *
-   * @param option
-   * @throws Exception
    */
   private void doPartitionErrors(PartitionOption option) throws Exception {
     doPartitionErrors(option, new KafkaSink());
@@ -577,9 +584,6 @@ public class TestKafkaSink {
    * a large skew to some partitions and then verify that this actually happened
    * by reading messages directly using a Kafka Consumer.
    *
-   * @param usePartitionHeader
-   * @param staticPtn
-   * @throws Exception
    */
   private void doPartitionHeader(PartitionTestScenario scenario) throws Exception {
     final int numPtns = 5;
@@ -680,25 +684,15 @@ public class TestKafkaSink {
     return kafkaSink.process();
   }
 
-  public static void createTopic(String topicName, int numPartitions) {
-    int sessionTimeoutMs = 10000;
-    int connectionTimeoutMs = 10000;
-    ZkUtils zkUtils =
-        ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
-    int replicationFactor = 1;
-    Properties topicConfig = new Properties();
-    AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig);
+  private void createTopic(String topicName, int numPartitions) {
+    testUtil.createTopics(Collections.singletonList(topicName), numPartitions);
   }
 
-  public static void deleteTopic(String topicName) {
-    int sessionTimeoutMs = 10000;
-    int connectionTimeoutMs = 10000;
-    ZkUtils zkUtils =
-        ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
-    AdminUtils.deleteTopic(zkUtils, topicName);
+  private void deleteTopic(String topicName) {
+    testUtil.deleteTopic(topicName);
   }
 
-  public String findUnusedTopic() {
+  private String findUnusedTopic() {
     String newTopic = null;
     boolean topicFound = false;
     while (!topicFound) {
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaConsumer.java
deleted file mode 100644 (file)
index d5dfbd6..0000000
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- limitations under the License.
- */
-
-package org.apache.flume.sink.kafka.util;
-
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.ConsumerTimeoutException;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * A Kafka Consumer implementation. This uses the current thread to fetch the
- * next message from the queue and doesn't use a multi threaded implementation.
- * So this implements a synchronous blocking call.
- * To avoid infinite waiting, a timeout is implemented to wait only for
- * 10 seconds before concluding that the message will not be available.
- */
-public class KafkaConsumer {
-
-  private static final Logger logger = LoggerFactory.getLogger(
-      KafkaConsumer.class);
-
-  private final ConsumerConnector consumer;
-  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap;
-
-  public KafkaConsumer() {
-    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
-        createConsumerConfig(TestUtil.getInstance().getZkUrl(), "group_1"));
-  }
-
-  private static ConsumerConfig createConsumerConfig(String zkUrl,
-      String groupId) {
-    Properties props = new Properties();
-    props.put("zookeeper.connect", zkUrl);
-    props.put("group.id", groupId);
-    props.put("zookeeper.session.timeout.ms", "1000");
-    props.put("zookeeper.sync.time.ms", "200");
-    props.put("auto.commit.interval.ms", "1000");
-    props.put("auto.offset.reset", "smallest");
-    props.put("consumer.timeout.ms","1000");
-    return new ConsumerConfig(props);
-  }
-
-  public void initTopicList(List<String> topics) {
-    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
-    for (String topic : topics) {
-      // we need only single threaded consumers
-      topicCountMap.put(topic, new Integer(1));
-    }
-    consumerMap = consumer.createMessageStreams(topicCountMap);
-  }
-
-  public MessageAndMetadata getNextMessage(String topic) {
-    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
-    // it has only a single stream, because there is only one consumer
-    KafkaStream stream = streams.get(0);
-    final ConsumerIterator<byte[], byte[]> it = stream.iterator();
-    int counter = 0;
-    try {
-      if (it.hasNext()) {
-        return it.next();
-      } else {
-        return null;
-      }
-    } catch (ConsumerTimeoutException e) {
-      logger.error("0 messages available to fetch for the topic " + topic);
-      return null;
-    }
-  }
-
-  public void shutdown() {
-    consumer.shutdown();
-  }
-}
index 00780fd..cdf9bad 100644 (file)
 
 package org.apache.flume.sink.kafka.util;
 
-import kafka.message.MessageAndMetadata;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.InetAddress;
 import java.net.ServerSocket;
-import java.net.UnknownHostException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A utility class for starting/stopping Kafka Server.
@@ -35,13 +45,17 @@ import java.util.Properties;
 public class TestUtil {
 
   private static final Logger logger = LoggerFactory.getLogger(TestUtil.class);
-  private static TestUtil instance = new TestUtil();
+  private static final TestUtil instance = new TestUtil();
 
   private KafkaLocal kafkaServer;
-  private KafkaConsumer kafkaConsumer;
-  private String hostname = "localhost";
+  private boolean externalServers = true;
+  private String kafkaServerUrl;
+  private String zkServerUrl;
   private int kafkaLocalPort;
+  private Properties clientProps;
   private int zkLocalPort;
+  private KafkaConsumer<String, String> consumer;
+  private AdminClient adminClient;
 
   private TestUtil() {
     init();
@@ -52,16 +66,31 @@ public class TestUtil {
   }
 
   private void init() {
-    // get the localhost.
     try {
-      hostname = InetAddress.getLocalHost().getHostName();
-    } catch (UnknownHostException e) {
-      logger.warn("Error getting the value of localhost. " +
-          "Proceeding with 'localhost'.", e);
+      Properties settings = new Properties();
+      InputStream in = Class.class.getResourceAsStream("/testutil.properties");
+      if (in != null) {
+        settings.load(in);
+      }
+      externalServers = "true".equalsIgnoreCase(settings.getProperty("external-servers"));
+      if (externalServers) {
+        kafkaServerUrl = settings.getProperty("kafka-server-url");
+        zkServerUrl = settings.getProperty("zk-server-url");
+      } else {
+        String hostname = InetAddress.getLocalHost().getHostName();
+        zkLocalPort = getNextPort();
+        kafkaLocalPort = getNextPort();
+        kafkaServerUrl = hostname + ":" + kafkaLocalPort;
+        zkServerUrl = hostname + ":" + zkLocalPort;
+      }
+      clientProps = createClientProperties();
+    } catch (Exception e) {
+      logger.error("Unexpected error", e);
+      throw new RuntimeException("Unexpected error", e);
     }
   }
 
-  private boolean startKafkaServer() {
+  private boolean startEmbeddedKafkaServer() {
     Properties kafkaProperties = new Properties();
     Properties zkProperties = new Properties();
 
@@ -72,7 +101,6 @@ public class TestUtil {
           "/zookeeper.properties"));
 
       //start local Zookeeper
-      zkLocalPort = getNextPort();
       // override the Zookeeper client port with the generated one.
       zkProperties.setProperty("clientPort", Integer.toString(zkLocalPort));
       new ZooKeeperLocal(zkProperties);
@@ -84,12 +112,12 @@ public class TestUtil {
           "/kafka-server.properties"));
       // override the Zookeeper url.
       kafkaProperties.setProperty("zookeeper.connect", getZkUrl());
-      kafkaLocalPort = getNextPort();
       // override the Kafka server port
       kafkaProperties.setProperty("port", Integer.toString(kafkaLocalPort));
       kafkaServer = new KafkaLocal(kafkaProperties);
       kafkaServer.start();
       logger.info("Kafka Server is successfully started on port " + kafkaLocalPort);
+
       return true;
 
     } catch (Exception e) {
@@ -98,25 +126,69 @@ public class TestUtil {
     }
   }
 
-  private KafkaConsumer getKafkaConsumer() {
-    synchronized (this) {
-      if (kafkaConsumer == null) {
-        kafkaConsumer = new KafkaConsumer();
-      }
+  private AdminClient getAdminClient() {
+    if (adminClient == null) {
+      Properties adminClientProps = createAdminClientProperties();
+      adminClient = AdminClient.create(adminClientProps);
     }
-    return kafkaConsumer;
+    return adminClient;
+  }
+
+  private Properties createClientProperties() {
+    final Properties props = createAdminClientProperties();
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+    props.put("auto.commit.interval.ms", "1000");
+    props.put("auto.offset.reset", "earliest");
+    props.put("consumer.timeout.ms","10000");
+    props.put("max.poll.interval.ms","10000");
+
+    // Create the consumer using props.
+    return props;
+  }
+
+  private Properties createAdminClientProperties() {
+    final Properties props = new Properties();
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaServerUrl());
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
+    return props;
   }
 
   public void initTopicList(List<String> topics) {
-    getKafkaConsumer().initTopicList(topics);
+    consumer = new KafkaConsumer<>(clientProps);
+    consumer.subscribe(topics);
+  }
+
+  public void createTopics(List<String> topicNames, int numPartitions) {
+    List<NewTopic> newTopics = new ArrayList<>();
+    for (String topicName: topicNames) {
+      NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
+      newTopics.add(newTopic);
+    }
+    getAdminClient().createTopics(newTopics);
+
+    //the following lines are a bit of black magic to ensure the topic is ready when we return
+    DescribeTopicsResult dtr = getAdminClient().describeTopics(topicNames);
+    try {
+      dtr.all().get(10, TimeUnit.SECONDS);
+    } catch (Exception e) {
+      throw new RuntimeException("Error getting topic info", e);
+    }
+  }
+  public void deleteTopic(String topicName) {
+    getAdminClient().deleteTopics(Collections.singletonList(topicName));
   }
 
-  public MessageAndMetadata getNextMessageFromConsumer(String topic) {
-    return getKafkaConsumer().getNextMessage(topic);
+  public ConsumerRecords<String, String> getNextMessageFromConsumer(String topic) {
+    return consumer.poll(Duration.ofMillis(1000L));
   }
 
   public void prepare() {
-    boolean startStatus = startKafkaServer();
+
+    if (externalServers) {
+      return;
+    }
+    boolean startStatus = startEmbeddedKafkaServer();
     if (!startStatus) {
       throw new RuntimeException("Error starting the server!");
     }
@@ -126,21 +198,28 @@ public class TestUtil {
     } catch (InterruptedException e) {
       // ignore
     }
-    getKafkaConsumer();
     logger.info("Completed the prepare phase.");
   }
 
   public void tearDown() {
     logger.info("Shutting down the Kafka Consumer.");
-    getKafkaConsumer().shutdown();
+    if (consumer != null) {
+      consumer.close();
+    }
+    if (adminClient != null) {
+      adminClient.close();
+      adminClient = null;
+    }
     try {
       Thread.sleep(3 * 1000);   // add this sleep time to
       // ensure that the server is fully started before proceeding with tests.
     } catch (InterruptedException e) {
       // ignore
     }
-    logger.info("Shutting down the kafka Server.");
-    kafkaServer.stop();
+    if (kafkaServer != null) {
+      logger.info("Shutting down the kafka Server.");
+      kafkaServer.stop();
+    }
     logger.info("Completed the tearDown phase.");
   }
 
@@ -151,10 +230,10 @@ public class TestUtil {
   }
 
   public String getZkUrl() {
-    return hostname + ":" + zkLocalPort;
+    return zkServerUrl;
   }
 
   public String getKafkaServerUrl() {
-    return hostname + ":" + kafkaLocalPort;
+    return kafkaServerUrl;
   }
 }
index 02a81e2..2312247 100644 (file)
@@ -116,3 +116,5 @@ zookeeper.connect=localhost:2181
 
 # Timeout in ms for connecting to zookeeper
 zookeeper.connection.timeout.ms=1000000
+
+offsets.topic.replication.factor=1
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/testutil.properties b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/testutil.properties
new file mode 100644 (file)
index 0000000..97bbf1b
--- /dev/null
@@ -0,0 +1,3 @@
+external-servers=false
+kafka-server-url=localhost:9092
+zk-server-url=localhost:2181
index 9b5d697..8e6812e 100644 (file)
@@ -56,7 +56,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_2.10</artifactId>
+      <artifactId>kafka_2.11</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.zookeeper</groupId>
@@ -72,7 +72,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_2.10</artifactId>
+      <artifactId>kafka_2.11</artifactId>
       <classifier>test</classifier>
       <scope>test</scope>
     </dependency>
index 10b2cfb..ddffa87 100644 (file)
@@ -17,6 +17,7 @@
 package org.apache.flume.source.kafka;
 
 import java.io.ByteArrayInputStream;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -28,11 +29,12 @@ import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
+import kafka.cluster.Broker;
 import kafka.cluster.BrokerEndPoint;
-import kafka.utils.ZKGroupTopicDirs;
-import kafka.utils.ZkUtils;
+import kafka.zk.KafkaZkClient;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
@@ -59,16 +61,19 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
-import scala.Option;
 
 import static org.apache.flume.source.kafka.KafkaSourceConstants.*;
-import static scala.collection.JavaConverters.asJavaListConverter;
+
+import scala.Option;
+import scala.collection.JavaConverters;
 
 /**
  * A Source for Kafka which reads messages from kafka topics.
@@ -128,6 +133,7 @@ public class KafkaSource extends AbstractPollableSource
   private String zookeeperConnect;
   private String bootstrapServers;
   private String groupId = DEFAULT_GROUP_ID;
+  @Deprecated
   private boolean migrateZookeeperOffsets = DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS;
   private String topicHeader = null;
   private boolean setTopicHeader;
@@ -189,7 +195,6 @@ public class KafkaSource extends AbstractPollableSource
   @Override
   protected Status doProcess() throws EventDeliveryException {
     final String batchUUID = UUID.randomUUID().toString();
-    byte[] kafkaMessage;
     String kafkaKey;
     Event event;
     byte[] eventBody;
@@ -206,22 +211,20 @@ public class KafkaSource extends AbstractPollableSource
         if (it == null || !it.hasNext()) {
           // Obtaining new records
           // Poll time is remainder time for current batch.
-          ConsumerRecords<String, byte[]> records = consumer.poll(
-                  Math.max(0, maxBatchEndTime - System.currentTimeMillis()));
+          long durMs = Math.max(0L, maxBatchEndTime - System.currentTimeMillis());
+          Duration duration = Duration.ofMillis(durMs);
+          ConsumerRecords<String, byte[]> records = consumer.poll(duration);
           it = records.iterator();
 
           // this flag is set to true in a callback when some partitions are revoked.
           // If there are any records we commit them.
-          if (rebalanceFlag.get()) {
-            rebalanceFlag.set(false);
+          if (rebalanceFlag.compareAndSet(true, false)) {
             break;
           }
           // check records after poll
           if (!it.hasNext()) {
-            if (log.isDebugEnabled()) {
-              counter.incrementKafkaEmptyCount();
-              log.debug("Returning with backoff. No more data to read");
-            }
+            counter.incrementKafkaEmptyCount();
+            log.debug("Returning with backoff. No more data to read");
             // batch time exceeded
             break;
           }
@@ -230,7 +233,6 @@ public class KafkaSource extends AbstractPollableSource
         // get next message
         ConsumerRecord<String, byte[]> message = it.next();
         kafkaKey = message.key();
-        kafkaMessage = message.value();
 
         if (useAvroEventFormat) {
           //Assume the event is in Avro format using the AvroFlumeEvent schema
@@ -471,18 +473,21 @@ public class KafkaSource extends AbstractPollableSource
    * Allows for backwards compatibility of the zookeeperConnect configuration.
    */
   private String lookupBootstrap(String zookeeperConnect, SecurityProtocol securityProtocol) {
-    ZkUtils zkUtils = ZkUtils.apply(zookeeperConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT,
-        JaasUtils.isZkSecurityEnabled());
-    try {
-      List<BrokerEndPoint> endPoints =
-          asJavaListConverter(zkUtils.getAllBrokerEndPointsForChannel(securityProtocol)).asJava();
+    try (KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperConnect,
+            JaasUtils.isZkSecurityEnabled(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, 10,
+            Time.SYSTEM, "kafka.server", "SessionExpireListener")) {
+      List<Broker> brokerList =
+              JavaConverters.seqAsJavaListConverter(zkClient.getAllBrokersInCluster()).asJava();
+      List<BrokerEndPoint> endPoints = brokerList.stream()
+              .map(broker -> broker.brokerEndPoint(
+                  ListenerName.forSecurityProtocol(securityProtocol))
+              )
+              .collect(Collectors.toList());
       List<String> connections = new ArrayList<>();
       for (BrokerEndPoint endPoint : endPoints) {
         connections.add(endPoint.connectionString());
       }
       return StringUtils.join(connections, ',');
-    } finally {
-      zkUtils.close();
     }
   }
 
@@ -535,8 +540,6 @@ public class KafkaSource extends AbstractPollableSource
     // Subscribe for topics by already specified strategy
     subscriber.subscribe(consumer, new SourceRebalanceListener(rebalanceFlag));
 
-    // Connect to kafka. 1 second is optimal time.
-    it = consumer.poll(1000).iterator();
     log.info("Kafka source {} started.", getName());
     counter.start();
   }
@@ -547,15 +550,17 @@ public class KafkaSource extends AbstractPollableSource
       consumer.wakeup();
       consumer.close();
     }
-    counter.stop();
+    if (counter != null) {
+      counter.stop();
+    }
     log.info("Kafka Source {} stopped. Metrics: {}", getName(), counter);
   }
 
   private void migrateOffsets(String topicStr) {
-    ZkUtils zkUtils = ZkUtils.apply(zookeeperConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT,
-        JaasUtils.isZkSecurityEnabled());
-    KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(kafkaProps);
-    try {
+    try (KafkaZkClient zkClient = KafkaZkClient.apply(zookeeperConnect,
+            JaasUtils.isZkSecurityEnabled(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, 10,
+            Time.SYSTEM, "kafka.server", "SessionExpireListener");
+         KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(kafkaProps)) {
       Map<TopicPartition, OffsetAndMetadata> kafkaOffsets =
           getKafkaOffsets(consumer, topicStr);
       if (!kafkaOffsets.isEmpty()) {
@@ -567,7 +572,7 @@ public class KafkaSource extends AbstractPollableSource
 
       log.info("No Kafka offsets found. Migrating zookeeper offsets");
       Map<TopicPartition, OffsetAndMetadata> zookeeperOffsets =
-          getZookeeperOffsets(zkUtils, topicStr);
+          getZookeeperOffsets(zkClient, consumer, topicStr);
       if (zookeeperOffsets.isEmpty()) {
         log.warn("No offsets to migrate found in Zookeeper");
         return;
@@ -583,9 +588,6 @@ public class KafkaSource extends AbstractPollableSource
       if (!newKafkaOffsets.keySet().containsAll(zookeeperOffsets.keySet())) {
         throw new FlumeException("Offsets could not be committed");
       }
-    } finally {
-      zkUtils.close();
-      consumer.close();
     }
   }
 
@@ -603,19 +605,18 @@ public class KafkaSource extends AbstractPollableSource
     return offsets;
   }
 
-  private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(ZkUtils client,
-                                                                     String topicStr) {
+  private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(
+          KafkaZkClient zkClient, KafkaConsumer<String, byte[]> consumer, String topicStr) {
+
     Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
-    ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr);
-    List<String> partitions = asJavaListConverter(
-        client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava();
-    for (String partition : partitions) {
-      TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition));
-      Option<String> data = client.readDataMaybeNull(
-          topicDirs.consumerOffsetDir() + "/" + partition)._1();
-      if (data.isDefined()) {
-        Long offset = Long.valueOf(data.get());
-        offsets.put(key, new OffsetAndMetadata(offset));
+    List<PartitionInfo> partitions = consumer.partitionsFor(topicStr);
+    for (PartitionInfo partition : partitions) {
+      TopicPartition topicPartition = new TopicPartition(topicStr, partition.partition());
+      Option<Object> optionOffset = zkClient.getConsumerOffset(groupId, topicPartition);
+      if (optionOffset.nonEmpty()) {
+        Long offset = (Long) optionOffset.get();
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset);
+        offsets.put(topicPartition, offsetAndMetadata);
       }
     }
     return offsets;
index 1186f6d..f4fe57d 100644 (file)
  */
 package org.apache.flume.source.kafka;
 
-import kafka.admin.AdminUtils;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServerStartable;
-import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.io.FileUtils;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -31,9 +32,12 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.ServerSocket;
+import java.util.Collections;
+import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 public class KafkaSourceEmbeddedKafka {
 
@@ -41,6 +45,7 @@ public class KafkaSourceEmbeddedKafka {
 
   KafkaServerStartable kafkaServer;
   KafkaSourceEmbeddedZookeeper zookeeper;
+  private AdminClient adminClient;
 
   private static int findFreePort() {
     try (ServerSocket socket = new ServerSocket(0)) {
@@ -70,6 +75,7 @@ public class KafkaSourceEmbeddedKafka {
     props.put("host.name", "localhost");
     props.put("port", String.valueOf(serverPort));
     props.put("log.dir", dir.getAbsolutePath());
+    props.put("offsets.topic.replication.factor", "1");
     if (properties != null) {
       props.putAll(properties);
     }
@@ -132,15 +138,31 @@ public class KafkaSourceEmbeddedKafka {
   }
 
   public void createTopic(String topicName, int numPartitions) {
-    // Create a ZooKeeper client
-    int sessionTimeoutMs = 10000;
-    int connectionTimeoutMs = 10000;
-    ZkClient zkClient =
-        ZkUtils.createZkClient(HOST + ":" + zkPort, sessionTimeoutMs, connectionTimeoutMs);
-    ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
-    int replicationFactor = 1;
-    Properties topicConfig = new Properties();
-    AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig);
+    AdminClient adminClient = getAdminClient();
+    NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) 1);
+    adminClient.createTopics(Collections.singletonList(newTopic));
+
+    //the following lines are a bit of black magic to ensure the topic is ready when we return
+    DescribeTopicsResult dtr = adminClient.describeTopics(Collections.singletonList(topicName));
+    try {
+      dtr.all().get(10, TimeUnit.SECONDS);
+    } catch (Exception e) {
+      throw new RuntimeException("Error getting topic info", e);
+    }
+  }
+
+  private AdminClient getAdminClient() {
+    if (adminClient == null) {
+      final Properties props = new Properties();
+      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, HOST + ":" + serverPort);
+      props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
+      adminClient = AdminClient.create(props);
+    }
+    return adminClient;
+  }
+
+  public void deleteTopics(List<String> topic) {
+    getAdminClient().deleteTopics(topic);
   }
 
 }
index bb20e35..a82c972 100644 (file)
@@ -20,9 +20,8 @@ package org.apache.flume.source.kafka;
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 import junit.framework.Assert;
-import kafka.common.TopicExistsException;
-import kafka.utils.ZKGroupTopicDirs;
-import kafka.utils.ZkUtils;
+
+import kafka.zk.KafkaZkClient;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumWriter;
@@ -37,6 +36,8 @@ import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -45,8 +46,12 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.internal.util.reflection.Whitebox;
@@ -58,13 +63,13 @@ import org.slf4j.LoggerFactory;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 import java.util.regex.Pattern;
 
 import static org.apache.flume.source.kafka.KafkaSourceConstants.AVRO_EVENT;
@@ -83,6 +88,7 @@ import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_TOPIC_H
 import static org.apache.flume.source.kafka.KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
@@ -92,22 +98,30 @@ public class TestKafkaSource {
   private static final Logger log = LoggerFactory.getLogger(TestKafkaSource.class);
 
   private KafkaSource kafkaSource;
-  private KafkaSourceEmbeddedKafka kafkaServer;
+  private static KafkaSourceEmbeddedKafka kafkaServer;
   private Context context;
   private List<Event> events;
 
-  private final Set<String> usedTopics = new HashSet<String>();
-  private String topic0 = "test1";
-  private String topic1 = "topic1";
+  private final List<String> usedTopics = new ArrayList<>();
+  private String topic0;
+  private String topic1;
+
+
+  @BeforeClass
+  public static void startKafkaServer() {
+    kafkaServer = new KafkaSourceEmbeddedKafka(null);
+    startupCheck();
+  }
 
   @SuppressWarnings("unchecked")
   @Before
   public void setup() throws Exception {
     kafkaSource = new KafkaSource();
-    kafkaServer = new KafkaSourceEmbeddedKafka(null);
     try {
+      topic0 = findUnusedTopic();
       kafkaServer.createTopic(topic0, 1);
       usedTopics.add(topic0);
+      topic1 = findUnusedTopic();
       kafkaServer.createTopic(topic1, 3);
       usedTopics.add(topic1);
     } catch (TopicExistsException e) {
@@ -118,6 +132,35 @@ public class TestKafkaSource {
     kafkaSource.setChannelProcessor(createGoodChannel());
   }
 
+  private static void startupCheck() {
+    String startupTopic = "startupCheck";
+    KafkaConsumer<String, String> startupConsumer;
+    kafkaServer.createTopic(startupTopic, 1);
+    final Properties props = new Properties();
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer.getBootstrapServers());
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+    consumer.subscribe(Collections.singletonList(startupTopic));
+    log.info("Checking Startup");
+    boolean success = false;
+    for (int i = 0; i < 20; i++) {
+      kafkaServer.produce(startupTopic, "", "record");
+      ConsumerRecords recs = consumer.poll(Duration.ofMillis(1000L));
+      if (!recs.isEmpty()) {
+        success = true;
+        break;
+      }
+    }
+    if (!success) {
+      fail("Kafka server startup failed");
+    }
+    log.info("Kafka server startup success");
+    consumer.close();
+    kafkaServer.deleteTopics(Collections.singletonList(startupTopic));
+  }
+
   private Context prepareDefaultContext(String groupId) {
     Context context = new Context();
     context.put(BOOTSTRAP_SERVERS, kafkaServer.getBootstrapServers());
@@ -127,10 +170,32 @@ public class TestKafkaSource {
 
   @After
   public void tearDown() throws Exception {
-    kafkaSource.stop();
+    try {
+      kafkaSource.stop();
+    } catch (Exception e) {
+      log.warn("Error stopping kafkaSource", e);
+    }
+    topic0 = null;
+    topic1 = null;
+    kafkaServer.deleteTopics(usedTopics);
+    usedTopics.clear();
+  }
+
+  @AfterClass
+  public static void stopKafkaServer() throws Exception {
     kafkaServer.stop();
   }
 
+  private void startKafkaSource() throws EventDeliveryException, InterruptedException {
+    kafkaSource.start();
+    /* Timing magic: We call the process method, that executes a consumer.poll()
+      A thread.sleep(10000L) does not work even though it takes longer */
+    for (int i = 0; i < 3; i++) {
+      kafkaSource.process();
+      Thread.sleep(1000);
+    }
+  }
+
   @SuppressWarnings("unchecked")
   @Test
   public void testOffsets() throws InterruptedException, EventDeliveryException {
@@ -140,7 +205,7 @@ public class TestKafkaSource {
             String.valueOf(batchDuration));
     context.put(BATCH_SIZE, "3");
     kafkaSource.configure(context);
-    kafkaSource.start();
+    startKafkaSource();
     Thread.sleep(500L);
     Status status = kafkaSource.process();
     assertEquals(Status.BACKOFF, status);
@@ -190,7 +255,7 @@ public class TestKafkaSource {
     kafkaSource = new KafkaSource();
     kafkaSource.setChannelProcessor(createGoodChannel());
     kafkaSource.configure(context);
-    kafkaSource.start();
+    startKafkaSource();
     kafkaServer.produce(topic1, "", "record14");
     Thread.sleep(1000L);
     assertEquals(Status.READY, kafkaSource.process());
@@ -209,7 +274,7 @@ public class TestKafkaSource {
     context.put(TOPICS, topic0);
     context.put(BATCH_SIZE, "1");
     kafkaSource.configure(context);
-    kafkaSource.start();
+    startKafkaSource();
 
     Thread.sleep(500L);
 
@@ -232,7 +297,7 @@ public class TestKafkaSource {
     context.put(TOPICS, topic0);
     context.put(BATCH_SIZE,"2");
     kafkaSource.configure(context);
-    kafkaSource.start();
+    startKafkaSource();
 
     Thread.sleep(500L);
 
@@ -258,7 +323,7 @@ public class TestKafkaSource {
           IllegalAccessException, InterruptedException {
     context.put(TOPICS, topic0);
     kafkaSource.configure(context);
-    kafkaSource.start();
+    startKafkaSource();
     Thread.sleep(500L);
 
     Status status = kafkaSource.process();
@@ -272,7 +337,7 @@ public class TestKafkaSource {
           IllegalAccessException, InterruptedException {
     context.put(TOPICS,"faketopic");
     kafkaSource.configure(context);
-    kafkaSource.start();
+    startKafkaSource();
     Thread.sleep(500L);
 
     Status status = kafkaSource.process();
@@ -287,7 +352,7 @@ public class TestKafkaSource {
     context.put(TOPICS, topic0);
     context.put(BOOTSTRAP_SERVERS,"blabla:666");
     kafkaSource.configure(context);
-    kafkaSource.start();
+    startKafkaSource();
     Thread.sleep(500L);
 
     Status status = kafkaSource.process();
@@ -299,7 +364,8 @@ public class TestKafkaSource {
     context.put(TOPICS, topic0);
     context.put(BATCH_DURATION_MS, "250");
     kafkaSource.configure(context);
-    kafkaSource.start();
+    startKafkaSource();
+    kafkaSource.process(); // timing magic
 
     Thread.sleep(500L);
 
@@ -323,7 +389,7 @@ public class TestKafkaSource {
     context.put(TOPICS, topic0);
     context.put(BATCH_SIZE, "1");
     kafkaSource.configure(context);
-    kafkaSource.start();
+    startKafkaSource();
 
     Thread.sleep(500L);
 
@@ -334,7 +400,7 @@ public class TestKafkaSource {
     Assert.assertEquals(Status.READY, kafkaSource.process());
     kafkaSource.stop();
     Thread.sleep(500L);
-    kafkaSource.start();
+    startKafkaSource();
     Thread.sleep(500L);
     Assert.assertEquals(Status.BACKOFF, kafkaSource.process());
   }
@@ -346,7 +412,7 @@ public class TestKafkaSource {
     context.put(BATCH_SIZE,"1");
     context.put(BATCH_DURATION_MS,"30000");
     kafkaSource.configure(context);
-    kafkaSource.start();
+    startKafkaSource();
     Thread.sleep(500L);
 
     kafkaServer.produce(topic0, "", "hello, world");
@@ -370,7 +436,7 @@ public class TestKafkaSource {
     context.put(BATCH_SIZE,"1");
     context.put(BATCH_DURATION_MS, "30000");
     kafkaSource.configure(context);
-    kafkaSource.start();
+    startKafkaSource();
     Thread.sleep(500L);
 
     kafkaServer.produce(topic0, "", "event 1");
@@ -393,7 +459,7 @@ public class TestKafkaSource {
     context.put(BATCH_DURATION_MS,"30000");
     context.put(KAFKA_CONSUMER_PREFIX + "enable.auto.commit", "true");
     kafkaSource.configure(context);
-    kafkaSource.start();
+    startKafkaSource();
     Thread.sleep(500L);
 
     kafkaServer.produce(topic0, "", "event 1");
@@ -417,7 +483,7 @@ public class TestKafkaSource {
     context.put(TOPICS, topic0);
     context.put(BATCH_SIZE, "1");
     kafkaSource.configure(context);
-    kafkaSource.start();
+    startKafkaSource();
 
     Thread.sleep(500L);
 
@@ -443,7 +509,7 @@ public class TestKafkaSource {
         .when(cp).processEventBatch(any(List.class));
     kafkaSource.setChannelProcessor(cp);
 
-    kafkaSource.start();
+    startKafkaSource();
 
     Thread.sleep(500L);
 
@@ -567,7 +633,7 @@ public class TestKafkaSource {
     context.put(BATCH_SIZE, "1");
     context.put(AVRO_EVENT, "true");
     kafkaSource.configure(context);
-    kafkaSource.start();
+    startKafkaSource();
 
     Thread.sleep(500L);
 
@@ -699,7 +765,7 @@ public class TestKafkaSource {
   public void testTopicHeaderSet() throws InterruptedException, EventDeliveryException {
     context.put(TOPICS, topic0);
     kafkaSource.configure(context);
-    kafkaSource.start();
+    startKafkaSource();
 
     Thread.sleep(500L);
 
@@ -730,7 +796,7 @@ public class TestKafkaSource {
     context.put(KafkaSourceConstants.TOPIC_HEADER, "customTopicHeader");
     kafkaSource.configure(context);
 
-    kafkaSource.start();
+    startKafkaSource();
 
     Thread.sleep(500L);
 
@@ -761,7 +827,7 @@ public class TestKafkaSource {
     context.put(KafkaSourceConstants.SET_TOPIC_HEADER, "false");
     kafkaSource.configure(context);
 
-    kafkaSource.start();
+    startKafkaSource();
 
     Thread.sleep(500L);
 
@@ -810,14 +876,13 @@ public class TestKafkaSource {
 
     // Commit 10th offset to zookeeper
     if (hasZookeeperOffsets) {
-      ZkUtils zkUtils = ZkUtils.apply(kafkaServer.getZkConnectString(), 30000, 30000,
-          JaasUtils.isZkSecurityEnabled());
-      ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(group, topic);
-      // we commit the tenth offset to ensure some data is missed.
+      KafkaZkClient zkClient = KafkaZkClient.apply(kafkaServer.getZkConnectString(),
+              JaasUtils.isZkSecurityEnabled(), 30000, 30000, 10, Time.SYSTEM,
+              "kafka.server", "SessionExpireListener");
+      zkClient.getConsumerOffset(group, new TopicPartition(topic, 0));
       Long offset = tenthOffset + 1;
-      zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir() + "/0", offset.toString(),
-          zkUtils.updatePersistentPath$default$3());
-      zkUtils.close();
+      zkClient.setOrCreateConsumerOffset(group, new TopicPartition(topic, 0), offset);
+      zkClient.close();
     }
 
     // Commit 5th offset to kafka
@@ -832,6 +897,11 @@ public class TestKafkaSource {
     // Start the source and read some data
     source.setChannelProcessor(createGoodChannel());
     source.start();
+    for (int i = 0; i < 3; i++) {
+      source.process();
+      Thread.sleep(1000);
+    }
+
     Thread.sleep(500L);
     source.process();
     List<Integer> finals = new ArrayList<Integer>(40);
index 49751c2..9227315 100644 (file)
@@ -72,7 +72,7 @@
 
     <dependency>
       <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_2.10</artifactId>
+      <artifactId>kafka_2.11</artifactId>
       <scope>provided</scope>
     </dependency>
     <dependency>
index b4adcd3..78e6f63 100644 (file)
@@ -21,7 +21,7 @@ package org.apache.flume.shared.kafka;
 import org.apache.flume.util.SSLUtil;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.SslConfigs;
-import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 
 import java.util.Properties;
 
index 6096bcf..1119152 100644 (file)
@@ -20,7 +20,7 @@ package org.apache.flume.shared.kafka;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.SslConfigs;
-import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
diff --git a/pom.xml b/pom.xml
index 687c471..b6b35fb 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -78,7 +78,7 @@ limitations under the License.
     <jetty.version>9.4.6.v20170531</jetty.version>
     <joda-time.version>2.9.9</joda-time.version>
     <junit.version>4.10</junit.version>
-    <kafka.version>0.9.0.1</kafka.version>
+    <kafka.version>2.0.1</kafka.version>
     <kite.version>1.0.0</kite.version>
     <hive.version>1.0.0</hive.version>
     <lifecycle-mapping.version>1.0.0</lifecycle-mapping.version>
@@ -106,7 +106,7 @@ limitations under the License.
     <rat.version>0.12</rat.version>
     <snappy-java.version>1.1.4</snappy-java.version>
     <solr-global.version>4.3.0</solr-global.version>
-    <slf4j.version>1.6.1</slf4j.version>
+    <slf4j.version>1.7.25</slf4j.version>
     <system-rules.version>1.17.0</system-rules.version>
     <thrift.version>0.9.3</thrift.version>
     <twitter4j.version>3.0.3</twitter4j.version>
@@ -1809,7 +1809,7 @@ limitations under the License.
       <!-- Dependencies of Kafka source -->
       <dependency>
         <groupId>org.apache.kafka</groupId>
-        <artifactId>kafka_2.10</artifactId>
+        <artifactId>kafka_2.11</artifactId>
         <version>${kafka.version}</version>
         <exclusions>
           <exclusion>
@@ -1824,7 +1824,7 @@ limitations under the License.
       </dependency>
       <dependency>
         <groupId>org.apache.kafka</groupId>
-        <artifactId>kafka_2.10</artifactId>
+        <artifactId>kafka_2.11</artifactId>
         <version>${kafka.version}</version>
         <classifier>test</classifier>
         <scope>test</scope>