FLUME-2989 added 2 KafkaChannel metrics
authorEndre Major <emajor@cloudera.com>
Fri, 23 Nov 2018 13:25:33 +0000 (14:25 +0100)
committerFerenc Szabo <szaboferee@apache.org>
Fri, 23 Nov 2018 13:25:33 +0000 (14:25 +0100)
KafkaChannel was missing some metrics:
  eventTakeAttemptCount, eventPutAttemptCount

This PR is based on the patch included in the issue that was the work
of Umesh Chaudhary.
I reworked the test a bit to use Mockito, and made some other minor
modifications to the test.

This closes #244

Reviewers: Peter Turcsanyi, Ferenc Szabo

(Endre Major via Ferenc Szabo)

flume-ng-channels/flume-kafka-channel/pom.xml
flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestBasicFunctionality.java

index f9211b2..926237d 100644 (file)
@@ -66,6 +66,11 @@ limitations under the License.
       <classifier>tests</classifier>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
 
   </dependencies>
 
index 694cf3f..40494d4 100644 (file)
@@ -454,6 +454,7 @@ public class KafkaChannel extends BasicChannelSemantics {
               new ProducerRecord<String, byte[]>(topic.get(), key,
                                                  serializeValue(event, parseAsFlumeEvent)));
         }
+        counter.incrementEventPutAttemptCount();
       } catch (NumberFormatException e) {
         throw new ChannelException("Non integer partition id specified", e);
       } catch (Exception e) {
@@ -518,6 +519,7 @@ public class KafkaChannel extends BasicChannelSemantics {
           } else {
             return null;
           }
+          counter.incrementEventTakeAttemptCount();
         } catch (Exception ex) {
           logger.warn("Error while getting events from Kafka. This is usually caused by " +
                       "trying to read a non-flume event. Ensure the setting for " +
index 4ff0ee6..d119b42 100644 (file)
@@ -20,13 +20,16 @@ package org.apache.flume.channel.kafka;
 
 import org.apache.flume.Context;
 import org.apache.flume.Event;
+import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurables;
+import org.apache.flume.instrumentation.kafka.KafkaChannelCounter;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -210,4 +213,36 @@ public class TestBasicFunctionality extends TestKafkaChannelBase {
     underlying.shutdownNow();
     verify(eventsPulled);
   }
+
+  @Test
+  public void testMetricsCount() throws Exception {
+    final KafkaChannel channel = startChannel(true);
+    ExecutorService underlying = Executors.newCachedThreadPool();
+    ExecutorCompletionService<Void> submitterSvc = new ExecutorCompletionService<Void>(underlying);
+    final List<List<Event>> events = createBaseList();
+    putEvents(channel, events, submitterSvc);
+    takeEventsWithCommittingTxn(channel,50);
+
+    KafkaChannelCounter counter =
+            (KafkaChannelCounter) Whitebox.getInternalState(channel, "counter");
+    Assert.assertEquals(50, counter.getEventPutAttemptCount());
+    Assert.assertEquals(50, counter.getEventPutSuccessCount());
+    Assert.assertEquals(50, counter.getEventTakeAttemptCount());
+    Assert.assertEquals(50, counter.getEventTakeSuccessCount());
+    channel.stop();
+  }
+
+  private void takeEventsWithCommittingTxn(KafkaChannel channel, long eventsCount) {
+    List<Event> takeEventsList = new ArrayList<>();
+    Transaction txn = channel.getTransaction();
+    txn.begin();
+    while (takeEventsList.size() < eventsCount) {
+      Event event = channel.take();
+      if (event != null) {
+        takeEventsList.add(event);
+      }
+    }
+    txn.commit();
+    txn.close();
+  }
 }