AMBARI-23863 : Fix issues in app aggregator in AMS memory cluster aggregation.
authorAravindan Vijayan <avijayan@hortonworks.com>
Wed, 16 May 2018 19:15:44 +0000 (12:15 -0700)
committeravijayanhwx <avijayan@hortonworks.com>
Wed, 16 May 2018 22:25:44 +0000 (15:25 -0700)
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricDistributedCache.java
ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCache.java
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCacheTest.java
ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java

index d09d4bb..2be9b01 100644 (file)
@@ -102,7 +102,7 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
 
   private TimelineMetricDistributedCache startCacheNode() throws MalformedURLException, URISyntaxException {
     //TODO make configurable
-    return new TimelineMetricsIgniteCache();
+    return new TimelineMetricsIgniteCache(metricMetadataManager);
   }
 
 
@@ -408,7 +408,7 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
     hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics, false);
 
     if (configuration.isCollectorInMemoryAggregationEnabled()) {
-      cache.putMetrics(metrics.getMetrics(), metricMetadataManager);
+      cache.putMetrics(metrics.getMetrics());
     }
 
     return response;
index 000b3bc..1ae6346 100644 (file)
@@ -27,6 +27,6 @@ import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataM
 
 public interface TimelineMetricDistributedCache {
   Map<TimelineClusterMetric, MetricClusterAggregate> evictMetricAggregates(Long startTime, Long endTime);
-  void putMetrics(Collection<TimelineMetric> elements, TimelineMetricMetadataManager metricMetadataManager);
+  void putMetrics(Collection<TimelineMetric> elements);
   Map<String, Double> getPointInTimeCacheMetrics();
 }
index e085fd8..b39c2a3 100644 (file)
  */
 package org.apache.ambari.metrics.core.timeline;
 
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_APP_ID;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_SERVICE_HTTP_POLICY;
-import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
-import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getTimeSlices;
-import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.sliceFromTimelineMetric;
-import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricUtils.getJavaMetricPatterns;
-
-import java.net.MalformedURLException;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-
-import javax.cache.Cache;
-import javax.cache.expiry.CreatedExpiryPolicy;
-import javax.cache.expiry.Duration;
-
+import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricHostMetadata;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataKey;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
 import org.apache.hadoop.metrics2.sink.timeline.availability.MetricCollectorHAHelper;
-import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
-import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricHostMetadata;
-import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.Ignition;
@@ -72,6 +44,35 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.ssl.SslContextFactory;
 
+import javax.cache.Cache;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_APP_ID;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_SERVICE_HTTP_POLICY;
+import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
+import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getTimeSlices;
+import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.sliceFromTimelineMetric;
+import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricUtils.getJavaMetricPatterns;
+
 public class TimelineMetricsIgniteCache implements TimelineMetricDistributedCache {
   private static final Log LOG =
       LogFactory.getLog(TimelineMetricsIgniteCache.class);
@@ -80,14 +81,16 @@ public class TimelineMetricsIgniteCache implements TimelineMetricDistributedCach
   private boolean interpolationEnabled;
   private List<String> skipAggrPatternStrings = new ArrayList<>();
   private List<String> appIdsToAggregate;
+  private TimelineMetricMetadataManager metricMetadataManager;
 
 
-  public TimelineMetricsIgniteCache() throws MalformedURLException, URISyntaxException {
+  public TimelineMetricsIgniteCache(TimelineMetricMetadataManager metricMetadataManager) throws MalformedURLException, URISyntaxException {
     TimelineMetricConfiguration timelineMetricConfiguration = TimelineMetricConfiguration.getInstance();
     Configuration metricConf = timelineMetricConfiguration.getMetricsConf();
     Configuration sslConf = timelineMetricConfiguration.getMetricsSslConf();
 
     IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
+    this.metricMetadataManager = metricMetadataManager;
 
     //TODO add config to disable logging
 
@@ -204,11 +207,10 @@ public class TimelineMetricsIgniteCache implements TimelineMetricDistributedCach
    * calculates applications host metrics based on the metadata of hosted apps
    * updates metadata of hosted apps if needed
    * @param elements
-   * @param metadataManager
    */
   @Override
-  public void putMetrics(Collection<TimelineMetric> elements, TimelineMetricMetadataManager metadataManager) {
-    Map<String, TimelineMetricHostMetadata> hostMetadata = metadataManager.getHostedAppsCache();
+  public void putMetrics(Collection<TimelineMetric> elements) {
+    Map<String, TimelineMetricHostMetadata> hostMetadata = metricMetadataManager.getHostedAppsCache();
     for (TimelineMetric metric : elements) {
       if (shouldBeSkipped(metric.getMetricName())) {
         if (LOG.isDebugEnabled()) {
@@ -260,8 +262,27 @@ public class TimelineMetricsIgniteCache implements TimelineMetricDistributedCach
 
   private void updateAppAggregatesFromHostMetric(TimelineClusterMetric key, MetricClusterAggregate newMetricClusterAggregate, TimelineMetricHostMetadata timelineMetricHostMetadata) {
     for (String appId : timelineMetricHostMetadata.getHostedApps().keySet()) {
-      TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(key.getMetricName(), appId, key.getInstanceId(), key.getTimestamp());
-      putMetricIntoCache(timelineClusterMetric, newMetricClusterAggregate);
+      if (appIdsToAggregate.contains(appId)) {
+        TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(key.getMetricName(), appId, key.getInstanceId(), key.getTimestamp());
+
+        TimelineMetricMetadataKey metadataKey = new TimelineMetricMetadataKey(timelineClusterMetric.getMetricName(), appId, timelineClusterMetric.getInstanceId());
+        TimelineMetricMetadata metricMetadata = metricMetadataManager.getMetadataCacheValue(metadataKey);
+
+        if (metricMetadata == null || metricMetadata.getUuid() == null) {
+          TimelineMetricMetadataKey metricMetadataKey = new TimelineMetricMetadataKey(timelineClusterMetric.getMetricName(), HOST_APP_ID, timelineClusterMetric.getInstanceId());
+          metricMetadata = metricMetadataManager.getMetadataCacheValue(metricMetadataKey);
+          if (metricMetadata != null) {
+            TimelineMetricMetadata timelineMetricMetadata = new TimelineMetricMetadata(timelineClusterMetric.getMetricName(),
+              appId, timelineClusterMetric.getInstanceId(), metricMetadata.getUnits(), metricMetadata.getType(), metricMetadata.getSeriesStartTime(),
+              metricMetadata.isSupportsAggregates(), TimelineMetricsFilter.acceptMetric(timelineClusterMetric.getMetricName(), appId));
+            byte[] uuid = metricMetadataManager.getUuid(timelineClusterMetric.getMetricName(), appId, timelineClusterMetric.getInstanceId(), StringUtils.EMPTY, true);
+            timelineMetricMetadata.setUuid(uuid);
+            metricMetadataManager.putIfModifiedTimelineMetricMetadata(timelineMetricMetadata);
+          }
+        }
+
+        putMetricIntoCache(timelineClusterMetric, newMetricClusterAggregate);
+      }
     }
   }
 
index 1d0f97a..239a1d5 100644 (file)
@@ -67,17 +67,18 @@ public class TimelineMetricsIgniteCacheTest {
     conf.getMetricsConf().set(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES, "localhost");
     replayAll();
 
-    timelineMetricsIgniteCache = new TimelineMetricsIgniteCache();
+    TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
+    expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class), anyBoolean())).andReturn(new byte[16]).once();
+    expect(metricMetadataManagerMock.getHostedAppsCache()).andReturn(new HashMap<>()).anyTimes();
+    replay(metricMetadataManagerMock);
+
+    timelineMetricsIgniteCache = new TimelineMetricsIgniteCache(metricMetadataManagerMock);
   }
 
   @Test
   public void putEvictMetricsFromCacheSlicesMerging() throws Exception {
     long cacheSliceIntervalMillis = 30000L;
 
-    TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
-    expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class), anyBoolean())).andReturn(new byte[16]).once();
-    replay(metricMetadataManagerMock);
-
     long startTime = getRoundedCheckPointTimeMillis(System.currentTimeMillis(), cacheSliceIntervalMillis);
 
     long seconds = 1000;
@@ -103,7 +104,7 @@ public class TimelineMetricsIgniteCacheTest {
 
     Collection<TimelineMetric> timelineMetrics = new ArrayList<>();
     timelineMetrics.add(timelineMetric);
-    timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock);
+    timelineMetricsIgniteCache.putMetrics(timelineMetrics);
     Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds);
 
     Assert.assertEquals(aggregateMap.size(), 2);
@@ -151,7 +152,7 @@ public class TimelineMetricsIgniteCacheTest {
     timelineMetrics = new ArrayList<>();
     timelineMetrics.add(timelineMetric);
     timelineMetrics.add(timelineMetric2);
-    timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock);
+    timelineMetricsIgniteCache.putMetrics(timelineMetrics);
     aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds);
 
     Assert.assertEquals(aggregateMap.size(), 2);
@@ -177,11 +178,6 @@ public class TimelineMetricsIgniteCacheTest {
 
     long cacheSliceIntervalMillis = 30000L;
 
-    TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
-    expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class), anyBoolean())).andReturn(new byte[16]).once();
-    expect(metricMetadataManagerMock.getHostedAppsCache()).andReturn(new HashMap<>()).anyTimes();
-    replay(metricMetadataManagerMock);
-
     long startTime = getRoundedCheckPointTimeMillis(System.currentTimeMillis(), cacheSliceIntervalMillis);
 
     long seconds = 1000;
@@ -216,7 +212,7 @@ public class TimelineMetricsIgniteCacheTest {
     timelineMetric.setMetricValues(metricValues);
     timelineMetrics.add(timelineMetric);
 
-    timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock);
+    timelineMetricsIgniteCache.putMetrics(timelineMetrics);
 
     Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds);
     Assert.assertEquals(aggregateMap.size(), 6);
index 34d470c..288c4b5 100644 (file)
@@ -52,6 +52,7 @@ import static org.powermock.api.easymock.PowerMock.replayAll;
 public class TimelineMetricClusterAggregatorSecondWithCacheSourceTest {
 
   private static TimelineMetricsIgniteCache timelineMetricsIgniteCache;
+  private static TimelineMetricMetadataManager metricMetadataManagerMock;
   @BeforeClass
   public static void setupConf() throws Exception {
     TimelineMetricConfiguration conf = new TimelineMetricConfiguration(new
@@ -60,8 +61,11 @@ public class TimelineMetricClusterAggregatorSecondWithCacheSourceTest {
     expect(TimelineMetricConfiguration.getInstance()).andReturn(conf).anyTimes();
     conf.getMetricsConf().set(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES, "localhost");
     replayAll();
+    metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
+    expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey) anyObject())).andReturn(null).anyTimes();
+    replay(metricMetadataManagerMock);
 
-    timelineMetricsIgniteCache = new TimelineMetricsIgniteCache();
+    timelineMetricsIgniteCache = new TimelineMetricsIgniteCache(metricMetadataManagerMock);
   }
 
   @Test
@@ -71,9 +75,6 @@ public class TimelineMetricClusterAggregatorSecondWithCacheSourceTest {
 
     Configuration configuration = new Configuration();
 
-    TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
-    expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey) anyObject())).andReturn(null).anyTimes();
-    replay(metricMetadataManagerMock);
 
     TimelineMetricClusterAggregatorSecondWithCacheSource secondAggregator = new TimelineMetricClusterAggregatorSecondWithCacheSource(
         METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, configuration, null,