*/
package org.apache.ambari.metrics.core.timeline;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_APP_ID;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
-import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_SERVICE_HTTP_POLICY;
-import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
-import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getTimeSlices;
-import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.sliceFromTimelineMetric;
-import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricUtils.getJavaMetricPatterns;
-
-import java.net.MalformedURLException;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-
-import javax.cache.Cache;
-import javax.cache.expiry.CreatedExpiryPolicy;
-import javax.cache.expiry.Duration;
-
+import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricHostMetadata;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataKey;
+import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
import org.apache.hadoop.metrics2.sink.timeline.availability.MetricCollectorHAHelper;
-import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
-import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricHostMetadata;
-import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.ssl.SslContextFactory;
+import javax.cache.Cache;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_APP_ID;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
+import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_SERVICE_HTTP_POLICY;
+import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
+import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getTimeSlices;
+import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.sliceFromTimelineMetric;
+import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricUtils.getJavaMetricPatterns;
+
public class TimelineMetricsIgniteCache implements TimelineMetricDistributedCache {
private static final Log LOG =
LogFactory.getLog(TimelineMetricsIgniteCache.class);
private boolean interpolationEnabled;
private List<String> skipAggrPatternStrings = new ArrayList<>();
private List<String> appIdsToAggregate;
+ private TimelineMetricMetadataManager metricMetadataManager;
- public TimelineMetricsIgniteCache() throws MalformedURLException, URISyntaxException {
+ public TimelineMetricsIgniteCache(TimelineMetricMetadataManager metricMetadataManager) throws MalformedURLException, URISyntaxException {
TimelineMetricConfiguration timelineMetricConfiguration = TimelineMetricConfiguration.getInstance();
Configuration metricConf = timelineMetricConfiguration.getMetricsConf();
Configuration sslConf = timelineMetricConfiguration.getMetricsSslConf();
IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
+ this.metricMetadataManager = metricMetadataManager;
//TODO add config to disable logging
* calculates applications host metrics based on the metadata of hosted apps
* updates metadata of hosted apps if needed
* @param elements
- * @param metadataManager
*/
@Override
- public void putMetrics(Collection<TimelineMetric> elements, TimelineMetricMetadataManager metadataManager) {
- Map<String, TimelineMetricHostMetadata> hostMetadata = metadataManager.getHostedAppsCache();
+ public void putMetrics(Collection<TimelineMetric> elements) {
+ Map<String, TimelineMetricHostMetadata> hostMetadata = metricMetadataManager.getHostedAppsCache();
for (TimelineMetric metric : elements) {
if (shouldBeSkipped(metric.getMetricName())) {
if (LOG.isDebugEnabled()) {
private void updateAppAggregatesFromHostMetric(TimelineClusterMetric key, MetricClusterAggregate newMetricClusterAggregate, TimelineMetricHostMetadata timelineMetricHostMetadata) {
for (String appId : timelineMetricHostMetadata.getHostedApps().keySet()) {
- TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(key.getMetricName(), appId, key.getInstanceId(), key.getTimestamp());
- putMetricIntoCache(timelineClusterMetric, newMetricClusterAggregate);
+ if (appIdsToAggregate.contains(appId)) {
+ TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(key.getMetricName(), appId, key.getInstanceId(), key.getTimestamp());
+
+ TimelineMetricMetadataKey metadataKey = new TimelineMetricMetadataKey(timelineClusterMetric.getMetricName(), appId, timelineClusterMetric.getInstanceId());
+ TimelineMetricMetadata metricMetadata = metricMetadataManager.getMetadataCacheValue(metadataKey);
+
+ if (metricMetadata == null || metricMetadata.getUuid() == null) {
+ TimelineMetricMetadataKey metricMetadataKey = new TimelineMetricMetadataKey(timelineClusterMetric.getMetricName(), HOST_APP_ID, timelineClusterMetric.getInstanceId());
+ metricMetadata = metricMetadataManager.getMetadataCacheValue(metricMetadataKey);
+ if (metricMetadata != null) {
+ TimelineMetricMetadata timelineMetricMetadata = new TimelineMetricMetadata(timelineClusterMetric.getMetricName(),
+ appId, timelineClusterMetric.getInstanceId(), metricMetadata.getUnits(), metricMetadata.getType(), metricMetadata.getSeriesStartTime(),
+ metricMetadata.isSupportsAggregates(), TimelineMetricsFilter.acceptMetric(timelineClusterMetric.getMetricName(), appId));
+ byte[] uuid = metricMetadataManager.getUuid(timelineClusterMetric.getMetricName(), appId, timelineClusterMetric.getInstanceId(), StringUtils.EMPTY, true);
+ timelineMetricMetadata.setUuid(uuid);
+ metricMetadataManager.putIfModifiedTimelineMetricMetadata(timelineMetricMetadata);
+ }
+ }
+
+ putMetricIntoCache(timelineClusterMetric, newMetricClusterAggregate);
+ }
}
}
conf.getMetricsConf().set(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES, "localhost");
replayAll();
- timelineMetricsIgniteCache = new TimelineMetricsIgniteCache();
+ TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
+ expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class), anyBoolean())).andReturn(new byte[16]).once();
+ expect(metricMetadataManagerMock.getHostedAppsCache()).andReturn(new HashMap<>()).anyTimes();
+ replay(metricMetadataManagerMock);
+
+ timelineMetricsIgniteCache = new TimelineMetricsIgniteCache(metricMetadataManagerMock);
}
@Test
public void putEvictMetricsFromCacheSlicesMerging() throws Exception {
long cacheSliceIntervalMillis = 30000L;
- TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
- expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class), anyBoolean())).andReturn(new byte[16]).once();
- replay(metricMetadataManagerMock);
-
long startTime = getRoundedCheckPointTimeMillis(System.currentTimeMillis(), cacheSliceIntervalMillis);
long seconds = 1000;
Collection<TimelineMetric> timelineMetrics = new ArrayList<>();
timelineMetrics.add(timelineMetric);
- timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock);
+ timelineMetricsIgniteCache.putMetrics(timelineMetrics);
Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds);
Assert.assertEquals(aggregateMap.size(), 2);
timelineMetrics = new ArrayList<>();
timelineMetrics.add(timelineMetric);
timelineMetrics.add(timelineMetric2);
- timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock);
+ timelineMetricsIgniteCache.putMetrics(timelineMetrics);
aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds);
Assert.assertEquals(aggregateMap.size(), 2);
long cacheSliceIntervalMillis = 30000L;
- TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class);
- expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class), anyBoolean())).andReturn(new byte[16]).once();
- expect(metricMetadataManagerMock.getHostedAppsCache()).andReturn(new HashMap<>()).anyTimes();
- replay(metricMetadataManagerMock);
-
long startTime = getRoundedCheckPointTimeMillis(System.currentTimeMillis(), cacheSliceIntervalMillis);
long seconds = 1000;
timelineMetric.setMetricValues(metricValues);
timelineMetrics.add(timelineMetric);
- timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock);
+ timelineMetricsIgniteCache.putMetrics(timelineMetrics);
Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds);
Assert.assertEquals(aggregateMap.size(), 6);