AMBARI-22343. Add ability in AMS to tee metrics to a set of configured Kafka brokers...
authorSiddharth Wagle <swagle@hortonworks.com>
Tue, 31 Oct 2017 18:34:45 +0000 (11:34 -0700)
committeravijayanhwx <avijayan@hortonworks.com>
Sun, 1 Apr 2018 19:13:52 +0000 (12:13 -0700)
ambari-metrics-anomaly-detection-service/pom.xml
ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
ambari-metrics-timelineservice/pom.xml
ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java
ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java
ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/KafkaSinkProvider.java [new file with mode: 0644]
ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/DefaultInternalMetricsSourceProvider.java
ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSource.java

index 554d026..e96e957 100644 (file)
       <artifactId>spark-mllib_${scala.binary.version}</artifactId>
       <version>${spark.version}</version>
       <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <version>${jackson.version}</version>
     </dependency>
     <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <version>4.12</version>
index 9ca9e95..bd88d57 100644 (file)
@@ -2,9 +2,6 @@ server:
   applicationConnectors:
    - type: http
      port: 9999
-  adminConnectors:
-    - type: http
-      port: 9990
   requestLog:
     type: external
 
index 3d119f9..7794a11 100644 (file)
@@ -80,7 +80,8 @@
               <outputDirectory>${project.build.directory}/lib</outputDirectory>
               <includeScope>compile</includeScope>
               <excludeScope>test</excludeScope>
-              <excludeArtifactIds>jasper-runtime,jasper-compiler</excludeArtifactIds>
+              <excludeArtifactIds>jasper-runtime,jasper-compiler
+              </excludeArtifactIds>
             </configuration>
           </execution>
         </executions>
                 <source>
                   <location>target/lib</location>
                   <excludes>
-                  <exclude>*tests.jar</exclude>
+                    <exclude>*tests.jar</exclude>
                   </excludes>
                 </source>
                 <source>
-                  <location>${project.build.directory}/${project.artifactId}-${project.version}.jar</location>
+                  <location>
+                    ${project.build.directory}/${project.artifactId}-${project.version}.jar
+                  </location>
                 </source>
               </sources>
             </mapping>
                   <location>conf/unix/amshbase_metrics_whitelist</location>
                 </source>
                 <source>
-                  <location>target/embedded/${hbase.folder}/conf/hbase-site.xml</location>
+                  <location>
+                    target/embedded/${hbase.folder}/conf/hbase-site.xml
+                  </location>
                 </source>
               </sources>
             </mapping>
           <skip>true</skip>
           <attach>false</attach>
           <submodules>false</submodules>
-          <controlDir>${project.basedir}/../src/main/package/deb/control</controlDir>
+          <controlDir>${project.basedir}/../src/main/package/deb/control
+          </controlDir>
         </configuration>
       </plugin>
     </plugins>
       <scope>test</scope>
       <classifier>tests</classifier>
     </dependency>
-      <dependency>
-        <groupId>org.apache.hbase</groupId>
-        <artifactId>hbase-testing-util</artifactId>
-        <version>${hbase.version}</version>
-        <scope>test</scope>
-        <optional>true</optional>
-        <exclusions>
-          <exclusion>
-            <groupId>org.jruby</groupId>
-            <artifactId>jruby-complete</artifactId>
-          </exclusion>
-          <exclusion>
-            <artifactId>zookeeper</artifactId>
-            <groupId>org.apache.zookeeper</groupId>
-          </exclusion>
-        </exclusions>
-      </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-testing-util</artifactId>
+      <version>${hbase.version}</version>
+      <scope>test</scope>
+      <optional>true</optional>
+      <exclusions>
+        <exclusion>
+          <groupId>org.jruby</groupId>
+          <artifactId>jruby-complete</artifactId>
+        </exclusion>
+        <exclusion>
+          <artifactId>zookeeper</artifactId>
+          <groupId>org.apache.zookeeper</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>0.11.0.1</version>
+    </dependency>
+
     <dependency>
       <groupId>org.powermock</groupId>
       <artifactId>powermock-module-junit4</artifactId>
                 </goals>
                 <configuration>
                   <target name="Download HBase">
-                    <mkdir dir="${project.build.directory}/embedded" />
+                    <mkdir dir="${project.build.directory}/embedded"/>
                     <get
-                      src="${hbase.tar}"
-                      dest="${project.build.directory}/embedded/hbase.tar.gz"
-                      usetimestamp="true"
-                      />
+                        src="${hbase.tar}"
+                        dest="${project.build.directory}/embedded/hbase.tar.gz"
+                        usetimestamp="true"
+                    />
                     <untar
-                      src="${project.build.directory}/embedded/hbase.tar.gz"
-                      dest="${project.build.directory}/embedded"
-                      compression="gzip"
-                      />
+                        src="${project.build.directory}/embedded/hbase.tar.gz"
+                        dest="${project.build.directory}/embedded"
+                        compression="gzip"
+                    />
                   </target>
                 </configuration>
               </execution>
                   <target name="Download Phoenix">
                     <mkdir dir="${project.build.directory}/embedded"/>
                     <get
-                      src="${phoenix.tar}"
-                      dest="${project.build.directory}/embedded/phoenix.tar.gz"
-                      usetimestamp="true"
-                      />
+                        src="${phoenix.tar}"
+                        dest="${project.build.directory}/embedded/phoenix.tar.gz"
+                        usetimestamp="true"
+                    />
                     <untar
-                      src="${project.build.directory}/embedded/phoenix.tar.gz"
-                      dest="${project.build.directory}/embedded"
-                      compression="gzip"
-                      />
+                        src="${project.build.directory}/embedded/phoenix.tar.gz"
+                        dest="${project.build.directory}/embedded"
+                        compression="gzip"
+                    />
                     <move
-                      file="${project.build.directory}/embedded/${phoenix.folder}/phoenix-${phoenix.version}-server.jar"
-                      tofile="${project.build.directory}/embedded/${hbase.folder}/lib/phoenix-${phoenix.version}-server.jar"
-                      />
+                        file="${project.build.directory}/embedded/${phoenix.folder}/phoenix-${phoenix.version}-server.jar"
+                        tofile="${project.build.directory}/embedded/${hbase.folder}/lib/phoenix-${phoenix.version}-server.jar"
+                    />
                   </target>
                 </configuration>
               </execution>
                 </goals>
                 <configuration>
                   <target name="Download HBase">
-                    <mkdir dir="${project.build.directory}/embedded" />
+                    <mkdir dir="${project.build.directory}/embedded"/>
                     <get
-                      src="${hbase.winpkg.zip}"
-                      dest="${project.build.directory}/embedded/hbase.zip"
-                      usetimestamp="true"
-                      />
+                        src="${hbase.winpkg.zip}"
+                        dest="${project.build.directory}/embedded/hbase.zip"
+                        usetimestamp="true"
+                    />
                     <unzip
-                      src="${project.build.directory}/embedded/hbase.zip"
-                      dest="${project.build.directory}/embedded/hbase.temp"
-                      />
+                        src="${project.build.directory}/embedded/hbase.zip"
+                        dest="${project.build.directory}/embedded/hbase.temp"
+                    />
                     <unzip
-                      src="${project.build.directory}/embedded/hbase.temp/resources/${hbase.winpkg.folder}.zip"
-                      dest="${project.build.directory}/embedded"
-                      />
+                        src="${project.build.directory}/embedded/hbase.temp/resources/${hbase.winpkg.folder}.zip"
+                        dest="${project.build.directory}/embedded"
+                    />
                     <copy
-                      file="${project.build.directory}/embedded/hbase.temp/resources/servicehost.exe"
-                      tofile="${project.build.directory}/embedded/${hbase.winpkg.folder}/bin/ams_hbase_master.exe"
-                      />
+                        file="${project.build.directory}/embedded/hbase.temp/resources/servicehost.exe"
+                        tofile="${project.build.directory}/embedded/${hbase.winpkg.folder}/bin/ams_hbase_master.exe"
+                    />
                   </target>
                 </configuration>
               </execution>
             <!-- The configuration of the plugin -->
             <configuration>
               <!-- Configuration of the archiver -->
-              <finalName>${project.artifactId}-simulator-${project.version}</finalName>
+              <finalName>${project.artifactId}-simulator-${project.version}
+              </finalName>
               <archive>
                 <!-- Manifest specific configuration -->
                 <manifest>
index f8d31f7..65b4614 100644 (file)
@@ -120,7 +120,6 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.RetryCounterFactory;
 import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
 import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
@@ -211,7 +210,7 @@ public class PhoenixHBaseAccessor {
   private HashMap<String, String> tableTTL = new HashMap<>();
 
   private final TimelineMetricConfiguration configuration;
-  private InternalMetricsSource rawMetricsSource;
+  private List<InternalMetricsSource> rawMetricsSources;
 
   public PhoenixHBaseAccessor(PhoenixConnectionProvider dataSource) {
     this(TimelineMetricConfiguration.getInstance(), dataSource);
@@ -278,15 +277,17 @@ public class PhoenixHBaseAccessor {
       LOG.info("Initialized aggregator sink class " + metricSinkClass);
     }
 
-    ExternalSinkProvider externalSinkProvider = configuration.getExternalSinkProvider();
+    List<ExternalSinkProvider> externalSinkProviderList = configuration.getExternalSinkProviderList();
     InternalSourceProvider internalSourceProvider = configuration.getInternalSourceProvider();
-    if (externalSinkProvider != null) {
-      ExternalMetricsSink rawMetricsSink = externalSinkProvider.getExternalMetricsSink(RAW_METRICS);
-      int interval = configuration.getExternalSinkInterval(RAW_METRICS);
-      if (interval == -1){
-        interval = cacheCommitInterval;
+    if (!externalSinkProviderList.isEmpty()) {
+      for (ExternalSinkProvider externalSinkProvider : externalSinkProviderList) {
+        ExternalMetricsSink rawMetricsSink = externalSinkProvider.getExternalMetricsSink(RAW_METRICS);
+        int interval = configuration.getExternalSinkInterval(externalSinkProvider.getClass().getSimpleName(), RAW_METRICS);
+        if (interval == -1) {
+          interval = cacheCommitInterval;
+        }
+        rawMetricsSources.add(internalSourceProvider.getInternalMetricsSource(RAW_METRICS, interval, rawMetricsSink));
       }
-      rawMetricsSource = internalSourceProvider.getInternalMetricsSource(RAW_METRICS, interval, rawMetricsSink);
     }
     TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper(this.metadataManagerInstance);
   }
@@ -303,8 +304,10 @@ public class PhoenixHBaseAccessor {
     }
     if (metricsList.size() > 0) {
       commitMetrics(metricsList);
-      if (rawMetricsSource != null) {
-        rawMetricsSource.publishTimelineMetrics(metricsList);
+      if (!rawMetricsSources.isEmpty()) {
+        for (InternalMetricsSource rawMetricsSource : rawMetricsSources) {
+          rawMetricsSource.publishTimelineMetrics(metricsList);
+        }
       }
     }
   }
@@ -316,10 +319,8 @@ public class PhoenixHBaseAccessor {
   private void commitAnomalyMetric(Connection conn, TimelineMetric metric) {
     PreparedStatement metricRecordStmt = null;
     try {
-
       Map<String, String> metricMetadata = metric.getMetadata();
-
-
+      
       byte[] uuid = metadataManagerInstance.getUuid(metric);
       if (uuid == null) {
         LOG.error("Error computing UUID for metric. Cannot write metrics : " + metric.toString());
index 929fc8c..395ec7b 100644 (file)
@@ -26,6 +26,7 @@ import java.net.MalformedURLException;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
@@ -51,8 +52,6 @@ import org.apache.log4j.Logger;
  * Configuration class that reads properties from ams-site.xml. All values
  * for time or intervals are given in seconds.
  */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
 public class TimelineMetricConfiguration {
   private static final Log LOG = LogFactory.getLog(TimelineMetricConfiguration.class);
 
@@ -343,14 +342,22 @@ public class TimelineMetricConfiguration {
   public static final String TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS = "timeline.metrics.collector.ignite.nodes.backups";
 
   public static final String INTERNAL_CACHE_HEAP_PERCENT =
-    "timeline.metrics.service.cache.%s.heap.percent";
+    "timeline.metrics.internal.cache.%s.heap.percent";
 
   public static final String EXTERNAL_SINK_INTERVAL =
-    "timeline.metrics.service.external.sink.%s.interval";
+    "timeline.metrics.external.sink.%s.%s.interval";
 
   public static final String DEFAULT_EXTERNAL_SINK_DIR =
-    "timeline.metrics.service.external.sink.dir";
-
+    "timeline.metrics.external.sink.dir";
+
+  public static final String KAFKA_SERVERS = "timeline.metrics.external.sink.kafka.bootstrap.servers";
+  public static final String KAFKA_ACKS = "timeline.metrics.external.sink.kafka.acks";
+  public static final String KAFKA_RETRIES = "timeline.metrics.external.sink.kafka.bootstrap.retries";
+  public static final String KAFKA_BATCH_SIZE = "timeline.metrics.external.sink.kafka.batch.size";
+  public static final String KAFKA_LINGER_MS = "timeline.metrics.external.sink.kafka.linger.ms";
+  public static final String KAFKA_BUFFER_MEM = "timeline.metrics.external.sink.kafka.buffer.memory";
+  public static final String KAFKA_SINK_TIMEOUT_SECONDS = "timeline.metrics.external.sink.kafka.timeout.seconds";
+  
   private Configuration hbaseConf;
   private Configuration metricsConf;
   private Configuration metricsSslConf;
@@ -601,8 +608,24 @@ public class TimelineMetricConfiguration {
     return false;
   }
 
-  public int getExternalSinkInterval(SOURCE_NAME sourceName) {
-    return Integer.parseInt(metricsConf.get(String.format(EXTERNAL_SINK_INTERVAL, sourceName), "-1"));
+  /**
+   * Get the sink interval for a metrics source.
+   * Determines how often the metrics will be written to the sink.
+   * This determines whether any caching will be needed on the collector
+   * side, default interval disables caching by writing at the same time as
+   * we get data.
+   *
+   * @param sinkProviderClassName Simple name of your implementation of {@link ExternalSinkProvider}
+   * @param sourceName {@link SOURCE_NAME}
+   * @return seconds
+   */
+  public int getExternalSinkInterval(String sinkProviderClassName,
+                                     SOURCE_NAME sourceName) {
+    String sinkProviderSimpleClassName = sinkProviderClassName.substring(
+      sinkProviderClassName.lastIndexOf(".") + 1);
+
+    return Integer.parseInt(metricsConf.get(
+      String.format(EXTERNAL_SINK_INTERVAL, sinkProviderSimpleClassName, sourceName), "-1"));
   }
 
   public InternalSourceProvider getInternalSourceProvider() {
@@ -612,12 +635,18 @@ public class TimelineMetricConfiguration {
     return ReflectionUtils.newInstance(providerClass, metricsConf);
   }
 
-  public ExternalSinkProvider getExternalSinkProvider() {
-    Class<?> providerClass = metricsConf.getClassByNameOrNull(TIMELINE_METRICS_SINK_PROVIDER_CLASS);
-    if (providerClass != null) {
-      return (ExternalSinkProvider) ReflectionUtils.newInstance(providerClass, metricsConf);
+  /**
+   * List of external sink provider classes. Comma-separated.
+   */
+  public List<ExternalSinkProvider> getExternalSinkProviderList() {
+    Class<?>[] providerClasses = metricsConf.getClasses(TIMELINE_METRICS_SINK_PROVIDER_CLASS);
+    List<ExternalSinkProvider> providerList = new ArrayList<>();
+    if (providerClasses != null) {
+      for (Class<?> providerClass : providerClasses) {
+        providerList.add((ExternalSinkProvider) ReflectionUtils.newInstance(providerClass, metricsConf));
+      }
     }
-    return null;
+    return providerList;
   }
 
   public String getInternalCacheHeapPercent(String instanceName) {
index 48887d9..7c7683b 100644 (file)
@@ -1,8 +1,3 @@
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink;
-
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -21,9 +16,14 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
  * limitations under the License.
  */
 
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink;
+
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME;
+
+
 /**
  * Configurable provider for sink classes that match the metrics sources.
- * Provider can return same sink of different sinks for each source.
+ * Provider can return same sink or different sinks for each source.
  */
 public interface ExternalSinkProvider {
 
index bb84c8a..9c2a93e 100644 (file)
@@ -92,7 +92,7 @@ public class HttpSinkProvider implements ExternalSinkProvider {
 
   @Override
   public ExternalMetricsSink getExternalMetricsSink(InternalSourceProvider.SOURCE_NAME sourceName) {
-    return null;
+    return new DefaultHttpMetricsSink();
   }
 
   protected HttpURLConnection getConnection(String spec) throws IOException {
@@ -147,7 +147,7 @@ public class HttpSinkProvider implements ExternalSinkProvider {
     @Override
     public int getSinkTimeOutSeconds() {
       try {
-        return conf.getMetricsConf().getInt("timeline.metrics.service.external.http.sink.timeout.seconds", 10);
+        return conf.getMetricsConf().getInt("timeline.metrics.external.sink.http.timeout.seconds", 10);
       } catch (Exception e) {
         return 10;
       }
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/KafkaSinkProvider.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/KafkaSinkProvider.java
new file mode 100644 (file)
index 0000000..3b34b55
--- /dev/null
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_ACKS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_BATCH_SIZE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_BUFFER_MEM;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_LINGER_MS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_RETRIES;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_SERVERS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.KAFKA_SINK_TIMEOUT_SECONDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL;
+
+import java.util.Collection;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/*
+  This will be used by the single Metrics committer thread. Hence it is
+  important to make this non-blocking export.
+ */
+public class KafkaSinkProvider implements ExternalSinkProvider {
+  private static String TOPIC_NAME = "ambari-metrics-topic";
+  private static final Log LOG = LogFactory.getLog(KafkaSinkProvider.class);
+
+  private Producer producer;
+  private int TIMEOUT_SECONDS = 10;
+  private int FLUSH_SECONDS = 3;
+
+  ObjectMapper objectMapper = new ObjectMapper();
+
+  public KafkaSinkProvider() {
+    TimelineMetricConfiguration configuration = TimelineMetricConfiguration.getInstance();
+
+    Properties configProperties = new Properties();
+    try {
+      configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getMetricsConf().getTrimmed(KAFKA_SERVERS));
+      configProperties.put(ProducerConfig.ACKS_CONFIG, configuration.getMetricsConf().getTrimmed(KAFKA_ACKS, "all"));
+      // Avoid duplicates - No transactional semantics
+      configProperties.put(ProducerConfig.RETRIES_CONFIG, configuration.getMetricsConf().getInt(KAFKA_RETRIES, 0));
+      configProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, configuration.getMetricsConf().getInt(KAFKA_BATCH_SIZE, 128));
+      configProperties.put(ProducerConfig.LINGER_MS_CONFIG, configuration.getMetricsConf().getInt(KAFKA_LINGER_MS, 1));
+      configProperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, configuration.getMetricsConf().getLong(KAFKA_BUFFER_MEM, 33554432)); // 32 MB
+      FLUSH_SECONDS = configuration.getMetricsConf().getInt(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3);
+      TIMEOUT_SECONDS = configuration.getMetricsConf().getInt(KAFKA_SINK_TIMEOUT_SECONDS, 10);
+    } catch (Exception e) {
+      LOG.error("Configuration error!", e);
+      throw new ExceptionInInitializerError(e);
+    }
+    configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
+    configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.connect.json.JsonSerializer");
+
+    
+
+    producer = new KafkaProducer(configProperties);
+  }
+
+  @Override
+  public ExternalMetricsSink getExternalMetricsSink(SOURCE_NAME sourceName) {
+    switch (sourceName) {
+      case RAW_METRICS:
+        return new KafkaRawMetricsSink();
+      default:
+        throw new UnsupportedOperationException("Provider does not support " +
+          "the expected source " + sourceName);
+    }
+  }
+
+  class KafkaRawMetricsSink implements ExternalMetricsSink {
+
+    @Override
+    public int getSinkTimeOutSeconds() {
+      return TIMEOUT_SECONDS;
+    }
+
+    @Override
+    public int getFlushSeconds() {
+      return FLUSH_SECONDS;
+    }
+
+    @Override
+    public void sinkMetricData(Collection<TimelineMetrics> metrics) {
+      JsonNode jsonNode = objectMapper.valueToTree(metrics);
+      ProducerRecord<String, JsonNode> rec = new ProducerRecord<String, JsonNode>(TOPIC_NAME, jsonNode);
+      Future<RecordMetadata> f = producer.send(rec);
+    }
+  }
+
+}
index b97c39f..c6b071f 100644 (file)
@@ -24,7 +24,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 public class DefaultInternalMetricsSourceProvider implements InternalSourceProvider {
   private static final Log LOG = LogFactory.getLog(DefaultInternalMetricsSourceProvider.class);
 
-  // TODO: Implement read based sources for higher level data
+  // TODO: Implement read based sources for higher order data
   @Override
   public InternalMetricsSource getInternalMetricsSource(SOURCE_NAME sourceName, int sinkIntervalSeconds, ExternalMetricsSink sink) {
     if (sink == null) {
index 967d819..879577a 100644 (file)
@@ -63,21 +63,14 @@ public class RawMetricsSource implements InternalMetricsSource {
   }
 
   private void initializeFixedRateScheduler() {
-    executorService.scheduleAtFixedRate(new Runnable() {
-      @Override
-      public void run() {
-        rawMetricsSink.sinkMetricData(cache.evictAll());
-      }
-    }, rawMetricsSink.getFlushSeconds(), rawMetricsSink.getFlushSeconds(), TimeUnit.SECONDS);
+    executorService.scheduleAtFixedRate(() -> rawMetricsSink.sinkMetricData(cache.evictAll()),
+      rawMetricsSink.getFlushSeconds(), rawMetricsSink.getFlushSeconds(), TimeUnit.SECONDS);
   }
 
   private void submitDataWithTimeout(final Collection<TimelineMetrics> metrics) {
-    Future f = executorService.submit(new Callable<Object>() {
-      @Override
-      public Object call() throws Exception {
-        rawMetricsSink.sinkMetricData(metrics);
-        return null;
-      }
+    Future f = executorService.submit(() -> {
+      rawMetricsSink.sinkMetricData(metrics);
+      return null;
     });
     try {
       f.get(rawMetricsSink.getSinkTimeOutSeconds(), TimeUnit.SECONDS);