[AMBARI-24557] Remove legacy storm sink module from ambari-metrics. (#2193)
authoravijayanhwx <avijayan@hortonworks.com>
Wed, 29 Aug 2018 19:09:53 +0000 (12:09 -0700)
committerGitHub <noreply@github.com>
Wed, 29 Aug 2018 19:09:53 +0000 (12:09 -0700)
ambari-metrics-assembly/pom.xml
ambari-metrics-assembly/src/main/assembly/sink-windows.xml
ambari-metrics-assembly/src/main/assembly/sink.xml
ambari-metrics-storm-sink-legacy/pom.xml [deleted file]
ambari-metrics-storm-sink-legacy/src/main/assemblies/empty.xml [deleted file]
ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java [deleted file]
ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java [deleted file]
ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java [deleted file]
pom.xml

index f89b9df..4f36e5a 100644 (file)
@@ -39,7 +39,6 @@
     <grafana.dir>${project.basedir}/../ambari-metrics-grafana</grafana.dir>
     <hadoop-sink.dir>${project.basedir}/../ambari-metrics-hadoop-sink</hadoop-sink.dir>
     <storm-sink.dir>${project.basedir}/../ambari-metrics-storm-sink</storm-sink.dir>
-    <storm-sink-legacy.dir>${project.basedir}/../ambari-metrics-storm-sink-legacy</storm-sink-legacy.dir>
     <flume-sink.dir>${project.basedir}/../ambari-metrics-flume-sink</flume-sink.dir>
     <kafka-sink.dir>${project.basedir}/../ambari-metrics-kafka-sink</kafka-sink.dir>
     <python.ver>python &gt;= 2.6</python.ver>
@@ -53,7 +52,6 @@
     <deb.dependency.list>${deb.python.ver},python-dev,gcc</deb.dependency.list>
     <hadoop.sink.jar>ambari-metrics-hadoop-sink-with-common-${project.version}.jar</hadoop.sink.jar>
     <storm.sink.jar>ambari-metrics-storm-sink-with-common-${project.version}.jar</storm.sink.jar>
-    <storm.sink.legacy.jar>ambari-metrics-storm-sink-legacy-with-common-${project.version}.jar</storm.sink.legacy.jar>
     <flume.sink.jar>ambari-metrics-flume-sink-with-common-${project.version}.jar</flume.sink.jar>
     <kafka.sink.jar>ambari-metrics-kafka-sink-with-common-${project.version}.jar</kafka.sink.jar>
   </properties>
                       </sources>
                     </mapping>
                     <mapping>
-                      <directory>/usr/lib/storm/lib</directory>
-                      <sources>
-                        <source>
-                          <location>${storm-sink-legacy.dir}/target/ambari-metrics-storm-sink-legacy-with-common-${project.version}.jar</location>
-                        </source>
-                      </sources>
-                    </mapping>
-                    <mapping>
                       <directory>/usr/lib/ambari-metrics-kafka-sink</directory>
                       <sources>
                         <source>
                     <prefix>/usr/lib/storm/lib</prefix>
                   </mapper>
                 </data>
-                <data>
-                  <src>${storm-sink-legacy.dir}/target/${storm.sink.legacy.jar}</src>
-                  <type>file</type>
-                  <mapper>
-                    <type>perm</type>
-                    <filemode>644</filemode>
-                    <dirmode>755</dirmode>
-                    <prefix>/usr/lib/storm/lib</prefix>
-                  </mapper>
-                </data>
 
                 <!-- kafka sink -->
 
     </dependency>
     <dependency>
       <groupId>org.apache.ambari</groupId>
-      <artifactId>ambari-metrics-storm-sink-legacy</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.ambari</groupId>
       <artifactId>ambari-metrics-kafka-sink</artifactId>
       <version>${project.version}</version>
     </dependency>
index 14b49b2..e82d2d4 100644 (file)
       <outputDirectory>hadoop-sink/conf</outputDirectory>
     </fileSet>
     <fileSet>
-      <directory>${storm-sink-legacy.dir}/src/main/conf</directory>
-      <outputDirectory>hadoop-sink/conf</outputDirectory>
-    </fileSet>
-    <fileSet>
       <directory>${kafka-sink.dir}/target/lib</directory>
       <outputDirectory>hadoop-sink/lib</outputDirectory>
     </fileSet>
       <outputDirectory>hadoop-sink</outputDirectory>
     </file>
     <file>
-      <source>${storm-sink-legacy.dir}/target/ambari-metrics-storm-sink-legacy-with-common-${project.version}.jar</source>
-      <outputDirectory>hadoop-sink</outputDirectory>
-    </file>
-    <file>
       <source>${kafka-sink.dir}/target/ambari-metrics-kafka-sink-with-common-${project.version}.jar</source>
       <outputDirectory>hadoop-sink</outputDirectory>
     </file>
index 34cdbc3..1400c7b 100644 (file)
       <outputDirectory>hadoop-sink/conf</outputDirectory>
     </fileSet>
     <fileSet>
-      <directory>${storm-sink-legacy.dir}/src/main/conf</directory>
-      <outputDirectory>hadoop-sink/conf</outputDirectory>
-    </fileSet>
-    <fileSet>
       <directory>${kafka-sink.dir}/target/lib</directory>
       <outputDirectory>hadoop-sink/lib</outputDirectory>
     </fileSet>
     </file>
     <file>
       <fileMode>644</fileMode>
-      <source>${storm-sink-legacy.dir}/target/ambari-metrics-storm-sink-legacy-with-common-${project.version}.jar</source>
-      <outputDirectory>hadoop-sink</outputDirectory>
-    </file>
-    <file>
-      <fileMode>644</fileMode>
       <source>${kafka-sink.dir}/target/ambari-metrics-kafka-sink-with-common-${project.version}.jar</source>
       <outputDirectory>hadoop-sink</outputDirectory>
     </file>
diff --git a/ambari-metrics-storm-sink-legacy/pom.xml b/ambari-metrics-storm-sink-legacy/pom.xml
deleted file mode 100644 (file)
index 4fc4d17..0000000
+++ /dev/null
@@ -1,207 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-   http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
-                             http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <parent>
-    <artifactId>ambari-metrics</artifactId>
-    <groupId>org.apache.ambari</groupId>
-    <version>2.0.0.0-SNAPSHOT</version>
-  </parent>
-  <modelVersion>4.0.0</modelVersion>
-  <artifactId>ambari-metrics-storm-sink-legacy</artifactId>
-  <version>2.0.0.0-SNAPSHOT</version>
-  <name>Ambari Metrics Storm Sink (Legacy)</name>
-  <packaging>jar</packaging>
-
-  <properties>
-    <!--<storm.version>0.9.3.2.2.1.0-2340</storm.version>-->
-    <storm.version>0.10.0.2.3.0.0-2557</storm.version>
-  </properties>
-
-  <build>
-    <plugins>
-      <plugin>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <version>3.0</version>
-      </plugin>
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>build-helper-maven-plugin</artifactId>
-        <version>1.8</version>
-        <executions>
-          <execution>
-            <id>parse-version</id>
-            <phase>validate</phase>
-            <goals>
-              <goal>parse-version</goal>
-            </goals>
-          </execution>
-          <execution>
-            <id>regex-property</id>
-            <goals>
-              <goal>regex-property</goal>
-            </goals>
-            <configuration>
-              <name>ambariVersion</name>
-              <value>${project.version}</value>
-              <regex>^([0-9]+)\.([0-9]+)\.([0-9]+)\.([0-9]+)(\.|-).*</regex>
-              <replacement>$1.$2.$3.$4</replacement>
-              <failIfNoMatch>false</failIfNoMatch>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>com.github.goldin</groupId>
-        <artifactId>copy-maven-plugin</artifactId>
-        <version>0.2.5</version>
-        <executions>
-          <execution>
-            <id>create-archive</id>
-            <phase>none</phase>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-shade-plugin</artifactId>
-        <version>2.2</version>
-        <executions>
-          <execution>
-            <phase>package</phase>
-            <goals>
-              <goal>shade</goal>
-            </goals>
-          </execution>
-        </executions>
-        <configuration>
-          <outputFile>${project.build.directory}/${project.artifactId}-with-common-${project.version}.jar</outputFile>
-          <minimizeJar>false</minimizeJar>
-          <keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
-          <artifactSet>
-            <includes>
-              <include>org.apache.ambari:ambari-metrics-common</include>
-              <include>org.codehaus.jackson:jackson-mapper-asl</include>
-              <include>org.codehaus.jackson:jackson-core-asl</include>
-              <include>org.codehaus.jackson:jackson-xc</include>
-              <include>org.apache.hadoop:hadoop-annotations</include>
-              <include>commons-logging:commons-logging</include>
-              <include>org.apache.commons:commons-lang3</include>
-              <include>commons-codec:commons-codec</include>
-            </includes>
-          </artifactSet>
-          <relocations>
-            <relocation>
-              <pattern>org.apache.commons.logging</pattern>
-              <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.commons.logging</shadedPattern>
-            </relocation>
-            <relocation>
-              <pattern>org.apache.hadoop.classification</pattern>
-              <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.hadoop.classification</shadedPattern>
-            </relocation>
-            <relocation>
-              <pattern>org.codehaus.jackson</pattern>
-              <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.jackson</shadedPattern>
-            </relocation>
-            <relocation>
-              <pattern>org.apache.commons.lang3</pattern>
-              <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.commons.lang3</shadedPattern>
-            </relocation>
-            <relocation>
-              <pattern>org.apache.commons.codec</pattern>
-              <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.commons.codec</shadedPattern>
-            </relocation>
-          </relocations>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.vafer</groupId>
-        <artifactId>jdeb</artifactId>
-        <version>1.0.1</version>
-        <executions>
-          <execution>
-            <!--Stub execution on direct plugin call - workaround for ambari deb build process-->
-            <id>stub-execution</id>
-            <phase>none</phase>
-            <goals>
-              <goal>jdeb</goal>
-            </goals>
-          </execution>
-        </executions>
-        <configuration>
-          <skip>true</skip>
-          <attach>false</attach>
-          <submodules>false</submodules>
-          <controlDir>${project.basedir}/../src/main/package/deb/control</controlDir>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-lang3</artifactId>
-      <version>3.3.2</version>
-    </dependency>
-    <dependency>
-      <groupId>commons-codec</groupId>
-      <artifactId>commons-codec</artifactId>
-      <version>1.8</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.storm</groupId>
-      <artifactId>storm-core</artifactId>
-      <version>${storm.version}</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.ambari</groupId>
-      <artifactId>ambari-metrics-common</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.codehaus.jackson</groupId>
-      <artifactId>jackson-mapper-asl</artifactId>
-      <version>1.9.13</version>
-    </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-      <version>4.10</version>
-    </dependency>
-    <dependency>
-      <groupId>org.easymock</groupId>
-      <artifactId>easymock</artifactId>
-      <version>3.2</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.powermock</groupId>
-      <artifactId>powermock-api-easymock</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.powermock</groupId>
-      <artifactId>powermock-module-junit4</artifactId>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-</project>
diff --git a/ambari-metrics-storm-sink-legacy/src/main/assemblies/empty.xml b/ambari-metrics-storm-sink-legacy/src/main/assemblies/empty.xml
deleted file mode 100644 (file)
index 35738b1..0000000
+++ /dev/null
@@ -1,21 +0,0 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-  
-       http://www.apache.org/licenses/LICENSE-2.0
-  
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<assembly>
-    <id>empty</id>
-    <formats/>
-</assembly>
diff --git a/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
deleted file mode 100644 (file)
index 842fad8..0000000
+++ /dev/null
@@ -1,238 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.metrics2.sink.storm;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.Validate;
-import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
-
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.SupervisorSummary;
-import backtype.storm.generated.TopologySummary;
-import backtype.storm.metric.IClusterReporter;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
-    implements IClusterReporter {
-
-  public static final String METRICS_COLLECTOR_CATEGORY = "metrics_collector";
-  public static final String APP_ID = "appId";
-
-  private String hostname;
-  private String collectorUri;
-  private String port;
-  private Collection<String> collectorHosts;
-  private String zkQuorum;
-  private String protocol;
-  private boolean setInstanceId;
-  private String instanceId;
-  private NimbusClient nimbusClient;
-  private String applicationId;
-  private int timeoutSeconds;
-  private boolean hostInMemoryAggregationEnabled;
-  private int hostInMemoryAggregationPort;
-  private String hostInMemoryAggregationProtocol;
-
-  public StormTimelineMetricsReporter() {
-
-  }
-
-  @Override
-  protected String getCollectorUri(String host) {
-    return constructTimelineMetricUri(protocol, host, port);
-  }
-
-  @Override
-  protected String getCollectorProtocol() {
-    return protocol;
-  }
-
-  @Override
-  protected int getTimeoutSeconds() {
-    return timeoutSeconds;
-  }
-
-  @Override
-  protected String getZookeeperQuorum() {
-    return zkQuorum;
-  }
-
-  @Override
-  protected Collection<String> getConfiguredCollectorHosts() {
-    return collectorHosts;
-  }
-
-  @Override
-  protected String getCollectorPort() {
-    return port;
-  }
-
-  @Override
-  protected String getHostname() {
-    return hostname;
-  }
-
-  @Override
-  protected boolean isHostInMemoryAggregationEnabled() {
-    return hostInMemoryAggregationEnabled;
-  }
-
-  @Override
-  protected int getHostInMemoryAggregationPort() {
-    return hostInMemoryAggregationPort;
-  }
-
-  @Override
-  protected String getHostInMemoryAggregationProtocol() {
-    return hostInMemoryAggregationProtocol;
-  }
-
-  @Override
-  public void prepare(Map conf) {
-    LOG.info("Preparing Storm Metrics Reporter");
-    try {
-      try {
-        hostname = InetAddress.getLocalHost().getHostName();
-        // If not FQDN , call  DNS
-        if ((hostname == null) || (!hostname.contains("."))) {
-          hostname = InetAddress.getLocalHost().getCanonicalHostName();
-        }
-      } catch (UnknownHostException e) {
-        LOG.error("Could not identify hostname.");
-        throw new RuntimeException("Could not identify hostname.", e);
-      }
-      Validate.notNull(conf.get(METRICS_COLLECTOR_CATEGORY), METRICS_COLLECTOR_CATEGORY + " can not be null");
-      Map cf = (Map) conf.get(METRICS_COLLECTOR_CATEGORY);
-      Map stormConf = Utils.readStormConfig();
-      this.nimbusClient = NimbusClient.getConfiguredClient(stormConf);
-
-      collectorHosts = parseHostsStringIntoCollection(cf.get(COLLECTOR_HOSTS_PROPERTY).toString());
-      protocol = cf.get(COLLECTOR_PROTOCOL) != null ? cf.get(COLLECTOR_PROTOCOL).toString() : "http";
-      port = cf.get(COLLECTOR_PORT) != null ? cf.get(COLLECTOR_PORT).toString() : "6188";
-      Object zkQuorumObj = cf.get(COLLECTOR_ZOOKEEPER_QUORUM);
-      if (zkQuorumObj != null) {
-        zkQuorum = zkQuorumObj.toString();
-      } else {
-        zkQuorum = cf.get(ZOOKEEPER_QUORUM) != null ? cf.get(ZOOKEEPER_QUORUM).toString() : null;
-      }
-
-      timeoutSeconds = cf.get(METRICS_POST_TIMEOUT_SECONDS) != null ?
-          Integer.parseInt(cf.get(METRICS_POST_TIMEOUT_SECONDS).toString()) :
-          DEFAULT_POST_TIMEOUT_SECONDS;
-      applicationId = cf.get(APP_ID).toString();
-      if (cf.containsKey(SET_INSTANCE_ID_PROPERTY)) {
-        setInstanceId = Boolean.getBoolean(cf.get(SET_INSTANCE_ID_PROPERTY).toString());
-        instanceId = cf.get(INSTANCE_ID_PROPERTY).toString();
-      }
-      hostInMemoryAggregationEnabled = Boolean.valueOf(cf.get(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY) != null ?
-        cf.get(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY).toString() : "false");
-      hostInMemoryAggregationPort = Integer.valueOf(cf.get(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY) != null ?
-        cf.get(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY).toString() : "61888");
-      hostInMemoryAggregationProtocol = cf.get(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY) != null ?
-        cf.get(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY).toString() : "http";
-
-      collectorUri = constructTimelineMetricUri(protocol, findPreferredCollectHost(), port);
-      if (protocol.contains("https") || hostInMemoryAggregationProtocol.contains("https")) {
-        String trustStorePath = cf.get(SSL_KEYSTORE_PATH_PROPERTY).toString().trim();
-        String trustStoreType = cf.get(SSL_KEYSTORE_TYPE_PROPERTY).toString().trim();
-        String trustStorePwd = cf.get(SSL_KEYSTORE_PASSWORD_PROPERTY).toString().trim();
-        loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
-      }
-    } catch (Exception e) {
-      LOG.warn("Could not initialize metrics collector, please specify " +
-          "protocol, host, port under $STORM_HOME/conf/config.yaml ", e);
-    }
-    // Initialize the collector write strategy
-    super.init();
-  }
-
-  @Override
-  public void reportMetrics() throws Exception {
-    List<TimelineMetric> totalMetrics = new ArrayList<TimelineMetric>(7);
-    ClusterSummary cs = this.nimbusClient.getClient().getClusterInfo();
-    long currentTimeMillis = System.currentTimeMillis();
-    totalMetrics.add(createTimelineMetric(currentTimeMillis,
-        applicationId, "Supervisors", String.valueOf(cs.get_supervisors_size())));
-    totalMetrics.add(createTimelineMetric(currentTimeMillis,
-        applicationId, "Topologies", String.valueOf(cs.get_topologies_size())));
-
-    List<SupervisorSummary> sups = cs.get_supervisors();
-    int totalSlots = 0;
-    int usedSlots = 0;
-    for (SupervisorSummary ssum : sups) {
-      totalSlots += ssum.get_num_workers();
-      usedSlots += ssum.get_num_used_workers();
-    }
-    int freeSlots = totalSlots - usedSlots;
-
-    totalMetrics.add(createTimelineMetric(currentTimeMillis,
-        applicationId, "Total Slots", String.valueOf(totalSlots)));
-    totalMetrics.add(createTimelineMetric(currentTimeMillis,
-        applicationId, "Used Slots", String.valueOf(usedSlots)));
-    totalMetrics.add(createTimelineMetric(currentTimeMillis,
-        applicationId, "Free Slots", String.valueOf(freeSlots)));
-
-    List<TopologySummary> topos = cs.get_topologies();
-    int totalExecutors = 0;
-    int totalTasks = 0;
-    for (TopologySummary topo : topos) {
-      totalExecutors += topo.get_num_executors();
-      totalTasks += topo.get_num_tasks();
-    }
-
-    totalMetrics.add(createTimelineMetric(currentTimeMillis,
-        applicationId, "Total Executors", String.valueOf(totalExecutors)));
-    totalMetrics.add(createTimelineMetric(currentTimeMillis,
-        applicationId, "Total Tasks", String.valueOf(totalTasks)));
-
-    TimelineMetrics timelineMetrics = new TimelineMetrics();
-    timelineMetrics.setMetrics(totalMetrics);
-
-    try {
-      emitMetrics(timelineMetrics);
-    } catch (UnableToConnectException e) {
-      LOG.warn("Unable to connect to Metrics Collector " + e.getConnectUrl() + ". " + e.getMessage());
-    }
-
-  }
-
-  private TimelineMetric createTimelineMetric(long currentTimeMillis, String component, String attributeName, String attributeValue) {
-    TimelineMetric timelineMetric = new TimelineMetric();
-    timelineMetric.setMetricName(attributeName);
-    timelineMetric.setHostName(hostname);
-    if (setInstanceId) {
-      timelineMetric.setInstanceId(instanceId);
-    }
-    timelineMetric.setAppId(component);
-    timelineMetric.setStartTime(currentTimeMillis);
-    timelineMetric.getMetricValues().put(currentTimeMillis, Double.parseDouble(attributeValue));
-    return timelineMetric;
-  }
-
-}
diff --git a/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
deleted file mode 100644 (file)
index e3494fd..0000000
+++ /dev/null
@@ -1,332 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.metrics2.sink.storm;
-
-import backtype.storm.metric.api.IMetricsConsumer;
-import backtype.storm.task.IErrorReporter;
-import backtype.storm.task.TopologyContext;
-import org.apache.commons.lang3.ClassUtils;
-import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
-import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
-import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS;
-import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT;
-
-public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer {
-  private static final String[] WARN_STRINGS_FOR_TOPOLOGY_OR_COMPONENT_NAME = { ".", "_" };
-
-  // create String manually in order to not rely on Guava Joiner or having our own
-  private static final String JOINED_WARN_STRINGS_FOR_MESSAGE = "\".\", \"_\"";
-
-  public static final String CLUSTER_REPORTER_APP_ID = "clusterReporterAppId";
-  public static final String DEFAULT_CLUSTER_REPORTER_APP_ID = "storm";
-  public static final String METRIC_NAME_PREFIX_KAFKA_OFFSET = "kafkaOffset.";
-
-  private String collectorUri;
-  private TimelineMetricsCache metricsCache;
-  private String hostname;
-  private int timeoutSeconds;
-  private Collection<String> collectorHosts;
-  private String zkQuorum;
-  private String protocol;
-  private String port;
-  private String topologyName;
-  private String applicationId;
-  private boolean setInstanceId;
-  private String instanceId;
-  private boolean hostInMemoryAggregationEnabled;
-  private int hostInMemoryAggregationPort;
-  private String hostInMemoryAggregationProtocol;
-
-  @Override
-  protected String getCollectorUri(String host) {
-    return constructTimelineMetricUri(protocol, host, port);
-  }
-
-  @Override
-  protected String getCollectorProtocol() {
-    return protocol;
-  }
-
-  @Override
-  protected int getTimeoutSeconds() {
-    return timeoutSeconds;
-  }
-
-  @Override
-  protected String getZookeeperQuorum() {
-    return zkQuorum;
-  }
-
-  @Override
-  protected Collection<String> getConfiguredCollectorHosts() {
-    return collectorHosts;
-  }
-
-  @Override
-  protected String getCollectorPort() {
-    return port;
-  }
-
-  @Override
-  protected String getHostname() {
-    return hostname;
-  }
-
-  @Override
-  protected boolean isHostInMemoryAggregationEnabled() {
-    return hostInMemoryAggregationEnabled;
-  }
-
-  @Override
-  protected int getHostInMemoryAggregationPort() {
-    return hostInMemoryAggregationPort;
-  }
-
-  @Override
-  protected String getHostInMemoryAggregationProtocol() {
-    return hostInMemoryAggregationProtocol;
-  }
-
-  @Override
-  public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) {
-    LOG.info("Preparing Storm Metrics Sink");
-    try {
-      hostname = InetAddress.getLocalHost().getHostName();
-      //If not FQDN , call  DNS
-      if ((hostname == null) || (!hostname.contains("."))) {
-        hostname = InetAddress.getLocalHost().getCanonicalHostName();
-      }
-    } catch (UnknownHostException e) {
-      LOG.error("Could not identify hostname.");
-      throw new RuntimeException("Could not identify hostname.", e);
-    }
-    Configuration configuration = new Configuration("/storm-metrics2.properties");
-    timeoutSeconds = Integer.parseInt(configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS,
-        String.valueOf(DEFAULT_POST_TIMEOUT_SECONDS)));
-    int maxRowCacheSize = Integer.parseInt(configuration.getProperty(MAX_METRIC_ROW_CACHE_SIZE,
-        String.valueOf(MAX_RECS_PER_NAME_DEFAULT)));
-    int metricsSendInterval = Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL,
-        String.valueOf(MAX_EVICTION_TIME_MILLIS)));
-    applicationId = configuration.getProperty(CLUSTER_REPORTER_APP_ID, DEFAULT_CLUSTER_REPORTER_APP_ID);
-    metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval);
-    collectorHosts = parseHostsStringIntoCollection(configuration.getProperty(COLLECTOR_HOSTS_PROPERTY));
-    zkQuorum = configuration.getProperty("zookeeper.quorum");
-    protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http");
-    port = configuration.getProperty(COLLECTOR_PORT, "6188");
-
-    instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY, null);
-    setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
-    hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY, "false"));
-    hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY, "61888"));
-    hostInMemoryAggregationProtocol = configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY, "http");
-    // Initialize the collector write strategy
-    super.init();
-
-    if (protocol.contains("https") || hostInMemoryAggregationProtocol.contains("https")) {
-      String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
-      String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();
-      String trustStorePwd = configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
-      loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
-    }
-    this.topologyName = removeNonce(topologyContext.getStormId());
-    warnIfTopologyNameContainsWarnString(topologyName);
-  }
-
-  @Override
-  public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
-    List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
-
-    for (DataPoint dataPoint : dataPoints) {
-      LOG.debug(dataPoint.name + " = " + dataPoint.value);
-      List<DataPoint> populatedDataPoints = populateDataPoints(dataPoint);
-
-      for (DataPoint populatedDataPoint : populatedDataPoints) {
-        String metricName;
-        if (populatedDataPoint.name.startsWith(METRIC_NAME_PREFIX_KAFKA_OFFSET)) {
-          metricName = createKafkaOffsetMetricName(populatedDataPoint.name);
-        } else {
-          metricName = createMetricName(taskInfo.srcComponentId, taskInfo.srcWorkerHost,
-              taskInfo.srcWorkerPort, taskInfo.srcTaskId, populatedDataPoint.name);
-        }
-
-        LOG.debug("populated datapoint: " + metricName + " = " + populatedDataPoint.value);
-
-        TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp * 1000,
-            taskInfo.srcWorkerHost, metricName, Double.valueOf(populatedDataPoint.value.toString()));
-
-        // Put intermediate values into the cache until it is time to send
-        metricsCache.putTimelineMetric(timelineMetric);
-
-        TimelineMetric cachedMetric = metricsCache.getTimelineMetric(timelineMetric.getMetricName());
-
-        if (cachedMetric != null) {
-          metricList.add(cachedMetric);
-        }
-      }
-    }
-
-    if (!metricList.isEmpty()) {
-      TimelineMetrics timelineMetrics = new TimelineMetrics();
-      timelineMetrics.setMetrics(metricList);
-
-      try {
-        emitMetrics(timelineMetrics);
-      } catch (UnableToConnectException uce) {
-        LOG.warn("Unable to send metrics to collector by address:" + uce.getConnectUrl());
-      }
-    }
-  }
-
-  @Override
-  public void cleanup() {
-    LOG.info("Stopping Storm Metrics Sink");
-  }
-
-  // purpose just for testing
-  void setTopologyName(String topologyName) {
-    this.topologyName = topologyName;
-  }
-
-  private String removeNonce(String topologyId) {
-    return topologyId.substring(0, topologyId.substring(0, topologyId.lastIndexOf("-")).lastIndexOf("-"));
-  }
-
-  private List<DataPoint> populateDataPoints(DataPoint dataPoint) {
-    List<DataPoint> dataPoints = new ArrayList<>();
-
-    if (dataPoint.value == null) {
-      LOG.warn("Data point with name " + dataPoint.name + " is null. Discarding." + dataPoint.name);
-    } else if (dataPoint.value instanceof Map) {
-      Map<String, Object> dataMap = (Map<String, Object>) dataPoint.value;
-
-      for (Map.Entry<String, Object> entry : dataMap.entrySet()) {
-        Double value = convertValueToDouble(entry.getKey(), entry.getValue());
-        if (value != null) {
-          dataPoints.add(new DataPoint(dataPoint.name + "." + entry.getKey(), value));
-        }
-      }
-    } else {
-      Double value = convertValueToDouble(dataPoint.name, dataPoint.value);
-      if (value != null) {
-        dataPoints.add(new DataPoint(dataPoint.name, value));
-      }
-    }
-
-    return dataPoints;
-  }
-
-  private Double convertValueToDouble(String metricName, Object value) {
-    if (value instanceof Number) {
-      return ((Number) value).doubleValue();
-    } else if (value instanceof String) {
-      try {
-        return Double.parseDouble((String) value);
-      } catch (NumberFormatException e) {
-        LOG.warn("Data point with name " + metricName + " doesn't have number format value " +
-            value + ". Discarding.");
-      }
-
-      return null;
-    } else {
-      LOG.warn("Data point with name " + metricName + " has value " + value +
-          " which is not supported. Discarding.");
-
-      return null;
-    }
-  }
-
-  private TimelineMetric createTimelineMetric(long currentTimeMillis, String hostName,
-                                              String attributeName, Double attributeValue) {
-    TimelineMetric timelineMetric = new TimelineMetric();
-    timelineMetric.setMetricName(attributeName);
-    timelineMetric.setHostName(hostName);
-    if (setInstanceId) {
-      timelineMetric.setInstanceId(instanceId);
-    }
-    timelineMetric.setAppId(applicationId);
-    timelineMetric.setStartTime(currentTimeMillis);
-    timelineMetric.setType(ClassUtils.getShortCanonicalName(
-        attributeValue, "Number"));
-    timelineMetric.getMetricValues().put(currentTimeMillis, attributeValue);
-    return timelineMetric;
-  }
-
-  private String createMetricName(String componentId, String workerHost, int workerPort, int taskId,
-      String attributeName) {
-    // <topology name>.<component name>.<worker host>.<worker port>.<task id>.<metric name>
-    String metricName = "topology." + topologyName + "." + componentId + "." + workerHost + "." + workerPort +
-        "." + taskId + "." + attributeName;
-
-    // since '._' is treat as special character (separator) so it should be replaced
-    return metricName.replace('_', '-');
-  }
-
-  private String createKafkaOffsetMetricName(String kafkaOffsetMetricName) {
-    // get rid of "kafkaOffset."
-    // <topic>/<metric name (starts with total)> or <topic>/partition_<partition_num>/<metricName>
-    String tempMetricName = kafkaOffsetMetricName.substring(METRIC_NAME_PREFIX_KAFKA_OFFSET.length());
-
-    String[] slashSplittedNames = tempMetricName.split("/");
-
-    if (slashSplittedNames.length == 1) {
-      // unknown metrics
-      throw new IllegalArgumentException("Unknown metrics for kafka offset metric: " + kafkaOffsetMetricName);
-    }
-
-    String topic = slashSplittedNames[0];
-    String metricName = "topology." + topologyName + ".kafka-topic." + topic;
-    if (slashSplittedNames.length > 2) {
-      // partition level
-      metricName = metricName + "." + slashSplittedNames[1] + "." + slashSplittedNames[2];
-    } else {
-      // topic level
-      metricName = metricName + "." + slashSplittedNames[1];
-    }
-
-    // since '._' is treat as special character (separator) so it should be replaced
-    return metricName.replace('_', '-');
-  }
-
-  private void warnIfTopologyNameContainsWarnString(String name) {
-    for (String warn : WARN_STRINGS_FOR_TOPOLOGY_OR_COMPONENT_NAME) {
-      if (name.contains(warn)) {
-        LOG.warn("Topology name \"" + name + "\" contains \"" + warn + "\" which can be problematic for AMS.");
-        LOG.warn("Encouraged to not using any of these strings: " + JOINED_WARN_STRINGS_FOR_MESSAGE);
-        LOG.warn("Same suggestion applies to component name.");
-      }
-    }
-  }
-
-  public void setMetricsCache(TimelineMetricsCache metricsCache) {
-    this.metricsCache = metricsCache;
-  }
-
-}
diff --git a/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java b/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
deleted file mode 100644 (file)
index 3b3e236..0000000
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.metrics2.sink.storm;
-
-import static org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink.METRIC_NAME_PREFIX_KAFKA_OFFSET;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.createMockBuilder;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
-
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import backtype.storm.metric.api.IMetricsConsumer;
-
-public class StormTimelineMetricsSinkTest {
-  @Test
-  public void testNonNumericMetricMetricExclusion() throws InterruptedException, IOException {
-    StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
-    stormTimelineMetricsSink.setTopologyName("topology1");
-    TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
-    stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
-    replay(timelineMetricsCache);
-    stormTimelineMetricsSink.handleDataPoints(
-        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
-        Collections.singleton(new IMetricsConsumer.DataPoint("key1", "value1")));
-    verify(timelineMetricsCache);
-  }
-
-  @Test
-  @Ignore // TODO: Fix for failover
-  public void testNumericMetricMetricSubmission() throws InterruptedException, IOException {
-    StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
-    stormTimelineMetricsSink.setTopologyName("topology1");
-    TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
-    expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234.42.key1"))
-        .andReturn(new TimelineMetric()).once();
-    timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
-    expectLastCall().once();
-    stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
-    replay(timelineMetricsCache);
-    stormTimelineMetricsSink.handleDataPoints(
-        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
-        Collections.singleton(new IMetricsConsumer.DataPoint("key1", 42)));
-    verify(timelineMetricsCache);
-  }
-
-  @Test
-  @Ignore // TODO: Fix for failover
-  public void testTopicLevelKafkaOffsetMetricSubmission() throws InterruptedException, IOException {
-    StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
-    stormTimelineMetricsSink.setTopologyName("topology1");
-    TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
-    expect(timelineMetricsCache.getTimelineMetric("topology.topology1.kafka-topic.topic1.totalLatestTimeOffset"))
-        .andReturn(new TimelineMetric()).once();
-    timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
-    expectLastCall().once();
-    stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
-    replay(timelineMetricsCache);
-    stormTimelineMetricsSink.handleDataPoints(
-        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
-        Collections.singleton(new IMetricsConsumer.DataPoint(METRIC_NAME_PREFIX_KAFKA_OFFSET + "topic1/totalLatestTimeOffset", 42)));
-    verify(timelineMetricsCache);
-  }
-
-  @Test
-  @Ignore // TODO: Fix for failover
-  public void testPartitionLevelKafkaOffsetMetricSubmission() throws InterruptedException, IOException {
-    StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
-    stormTimelineMetricsSink.setTopologyName("topology1");
-    TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
-    expect(timelineMetricsCache.getTimelineMetric("topology.topology1.kafka-topic.topic1.partition-1.latestTimeOffset"))
-        .andReturn(new TimelineMetric()).once();
-    timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
-    expectLastCall().once();
-    stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
-    replay(timelineMetricsCache);
-    stormTimelineMetricsSink.handleDataPoints(
-        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
-        Collections.singleton(new IMetricsConsumer.DataPoint(METRIC_NAME_PREFIX_KAFKA_OFFSET + "topic1/partition_1/latestTimeOffset", 42)));
-    verify(timelineMetricsCache);
-  }
-
-  @Test
-  @Ignore // TODO: Fix for failover
-  public void testMapMetricMetricSubmission() throws InterruptedException, IOException {
-    StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink();
-    stormTimelineMetricsSink.setTopologyName("topology1");
-    TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
-    expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234.42.key1.field1"))
-        .andReturn(new TimelineMetric()).once();
-    expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234.42.key1.field2"))
-        .andReturn(new TimelineMetric()).once();
-    timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
-    expectLastCall().once();
-    stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
-    replay(timelineMetricsCache);
-
-    Map<String, Object> valueMap = new HashMap<>();
-    valueMap.put("field1", 53);
-    valueMap.put("field2", 64.12);
-    stormTimelineMetricsSink.handleDataPoints(
-        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
-        Collections.singleton(new IMetricsConsumer.DataPoint("key1", valueMap)));
-    verify(timelineMetricsCache);
-  }
-}
diff --git a/pom.xml b/pom.xml
index c6e9335..7b5f02f 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -28,7 +28,6 @@
     <module>ambari-metrics-flume-sink</module>
     <module>ambari-metrics-kafka-sink</module>
     <module>ambari-metrics-storm-sink</module>
-    <module>ambari-metrics-storm-sink-legacy</module>
     <module>ambari-metrics-timelineservice</module>
     <module>ambari-metrics-host-monitoring</module>
     <module>ambari-metrics-grafana</module>