[AMBARI-25078] Add metering metrics to AMS Metric Monitor. (#13) master
authoravijayanhwx <avijayan@hortonworks.com>
Wed, 2 Jan 2019 21:04:33 +0000 (13:04 -0800)
committerGitHub <noreply@github.com>
Wed, 2 Jan 2019 21:04:33 +0000 (13:04 -0800)
22 files changed:
ambari-metrics-assembly/pom.xml
ambari-metrics-assembly/src/main/assembly/monitor.xml
ambari-metrics-grafana/ambari-metrics/datasource.js
ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
ambari-metrics-host-monitoring/conf/unix/instance_type_provider_azure [new file with mode: 0644]
ambari-metrics-host-monitoring/conf/unix/instance_type_provider_ec2 [new file with mode: 0644]
ambari-metrics-host-monitoring/conf/unix/instance_type_provider_gce [new file with mode: 0644]
ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py
ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
ambari-metrics-host-monitoring/src/main/python/core/controller.py
ambari-metrics-host-monitoring/src/main/python/core/instance_type_provider.py [new file with mode: 0644]
ambari-metrics-host-monitoring/src/main/python/core/metering.py [new file with mode: 0644]
ambari-metrics-host-monitoring/src/main/python/core/metric_collector.py
ambari-metrics-host-monitoring/src/test/python/core/TestMetricCollector.py
ambari-metrics-timelineservice/conf/unix/metrics_whitelist
ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregator.java
ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java
ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java
ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java
ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITMetricAggregator.java

index b76b928..0b17ea0 100644 (file)
                     <filemode>755</filemode>
                   </mapper>
                 </data>
-
+                <data>
+                  <src>${monitor.dir}/conf/unix/instance_type_provider_azure</src>
+                  <type>file</type>
+                  <mapper>
+                    <type>perm</type>
+                    <prefix>/etc/ambari-metrics-monitor/conf</prefix>
+                    <filemode>755</filemode>
+                  </mapper>
+                </data>
+                <data>
+                  <src>${monitor.dir}/conf/unix/instance_type_provider_ec2</src>
+                  <type>file</type>
+                  <mapper>
+                    <type>perm</type>
+                    <prefix>/etc/ambari-metrics-monitor/conf</prefix>
+                    <filemode>755</filemode>
+                  </mapper>
+                </data>
+                <data>
+                  <src>${monitor.dir}/conf/unix/instance_type_provider_gce</src>
+                  <type>file</type>
+                  <mapper>
+                    <type>perm</type>
+                    <prefix>/etc/ambari-metrics-monitor/conf</prefix>
+                    <filemode>755</filemode>
+                  </mapper>
+                </data>
                 <!-- Metric collector -->
 
                 <data>
index 7d098e9..aa12bef 100644 (file)
@@ -36,6 +36,9 @@
       <includes>
         <include>metric_groups.conf</include>
         <include>metric_monitor.ini</include>
+        <include>instance_type_provider_azure</include>
+        <include>instance_type_provider_ec2</include>
+        <include>instance_type_provider_gce</include>
       </includes>
     </fileSet>
     <fileSet>
index 18e2709..6dc0446 100644 (file)
@@ -112,21 +112,33 @@ define([
               return $q.when(emptyData(target));
             }
             var series = [];
-            var metricData = res.metrics[0].metrics;
-            // Added hostname to legend for templated dashboards.
-            var hostLegend = res.metrics[0].hostname ? ' on ' + res.metrics[0].hostname : '';
-            var timeSeries = {};
-            timeSeries = {
-              target: alias + hostLegend,
-              datapoints: []
-            };
-            for (var k in metricData) {
-              if (metricData.hasOwnProperty(k)) {
-                timeSeries.datapoints.push([metricData[k], (k - k % 1000)]);
-              }
-            }
-            series.push(timeSeries);
-            return $q.when({data: series});
+            var metricData = res.metrics;
+            _.map(metricData, function (data) {
+              // Added hostname to legend for templated dashboards.
+              var hostLegend = data.hostname ? ' on ' + data.hostname : '';
+              var alias = target.alias ? target.alias : target.metric;
+              if(!_.isEmpty(templateSrv.variables) && templateSrv.variables[0].query === "yarnqueues") {
+                alias = alias + ' on ' + target.qmetric; }
+              if(!_.isEmpty(templateSrv.variables) && templateSrv.variables[0].query === "kafka-topics") {
+                alias = alias + ' on ' + target.kbTopic; }
+              if (!alias.includes("%") || !data.metricname.includes('live_hosts')) {
+                if (!alias || alias.includes("%")) {
+                  alias = data.metricname;
+                }
+              var timeSeries = {};
+              timeSeries = {
+                target: alias + hostLegend,
+                datapoints: []
+                };
+                for (var k in data.metrics) {
+                  if (data.metrics.hasOwnProperty(k)) {
+                    timeSeries.datapoints.push([data.metrics[k], (k - k % 1000)]);
+                  }
+                }
+                series.push(timeSeries);
+                }
+               });
+               return $q.when({data: series});
           };
         };
         // To speed up querying on templatized dashboards.
@@ -200,11 +212,12 @@ define([
                 aliasSuffix = '';
               }
               if (data.appid.indexOf('ambari_server') === 0) {
-                alias = data.metricname;
                 aliasSuffix = '';
               }
+              if (!alias || alias.includes("%")) {
+                alias = data.metricname;
+              }
               timeSeries = {
-                target: alias + aliasSuffix,
                 datapoints: []
               };
               for (var k in data.metrics) {
index fa0c8fb..2b20379 100644 (file)
@@ -73,6 +73,7 @@ public class AggregatedMetricsPublisher extends AbstractMetricPublisher {
             double max = Integer.MIN_VALUE;
             double min = Integer.MAX_VALUE;
             int count = 0;
+            TimelineMetric tmpMetric = new TimelineMetric(metrics.getMetrics().iterator().next());
             for (TimelineMetric metric : metrics.getMetrics()) {
                 for (Double value : metric.getMetricValues().values()) {
                     sum+=value;
@@ -80,8 +81,10 @@ public class AggregatedMetricsPublisher extends AbstractMetricPublisher {
                     min = Math.min(min, value);
                     count++;
                 }
+                if (metric.getStartTime() > tmpMetric.getStartTime()) {
+                    tmpMetric.setStartTime(metric.getStartTime());
+                }
             }
-            TimelineMetric tmpMetric = new TimelineMetric(metrics.getMetrics().get(0));
             tmpMetric.setMetricValues(new TreeMap<Long, Double>());
             metricAggregateMap.add(new TimelineMetricWithAggregatedValues(tmpMetric, new MetricHostAggregate(sum, count, 0d, max, min)));
         }
diff --git a/ambari-metrics-host-monitoring/conf/unix/instance_type_provider_azure b/ambari-metrics-host-monitoring/conf/unix/instance_type_provider_azure
new file mode 100644 (file)
index 0000000..3c8ec20
--- /dev/null
@@ -0,0 +1,17 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific
+
+curl --silent -H Metadata:true "http://169.254.169.254/metadata/instance?api-version=2017-12-01" | grep -Po '"vmSize":.*?[^\\]",' | cut -d':' -f2 | sed 's/,/ /g' | sed 's/"//g'
\ No newline at end of file
diff --git a/ambari-metrics-host-monitoring/conf/unix/instance_type_provider_ec2 b/ambari-metrics-host-monitoring/conf/unix/instance_type_provider_ec2
new file mode 100644 (file)
index 0000000..7e89f9e
--- /dev/null
@@ -0,0 +1,17 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific
+
+curl --silent http://169.254.169.254/latest/dynamic/instance-identity/document | grep 'instanceType' | awk '{ print $3 }' | sed 's/,/ /g' | sed 's/"//g'
\ No newline at end of file
diff --git a/ambari-metrics-host-monitoring/conf/unix/instance_type_provider_gce b/ambari-metrics-host-monitoring/conf/unix/instance_type_provider_gce
new file mode 100644 (file)
index 0000000..6c237b9
--- /dev/null
@@ -0,0 +1,17 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific
+
+curl --silent -H "Metadata-Flavor: Google" "http://metadata.google.internal/computeMetadata/v1/instance/machine-type" | awk -F'/' '{print $4}'
\ No newline at end of file
index bd957a0..59d4af0 100644 (file)
@@ -88,6 +88,7 @@ class ApplicationMetricMap:
       pass
   
       for appId, metrics in local_metric_map.iteritems():
+        current_app_id = "HOST" if "HOST" in appId else appId
         for metricId, metricData in dict(metrics).iteritems():
           # Create a timeline metric object
           result_instanceid = ""
@@ -96,7 +97,7 @@ class ApplicationMetricMap:
           timeline_metric = {
             "hostname" : self.hostname,
             "metricname" : metricId,
-            "appid" : "HOST",
+            "appid" : current_app_id,
             "instanceid" : result_instanceid,
             "starttime" : self.get_start_time(appId, metricId),
             "metrics" : self.align_values_by_minute_mark(appId, metricId, metricData) if clear_once_flattened else metricData
index 721a544..18ade5b 100644 (file)
@@ -255,6 +255,21 @@ class Configuration:
     hosts = self.get("aggregation", "ams_monitor_log_dir", "/var/log/ambari-metrics-monitor")
     return hosts
 
+  def is_metering_enabled(self):
+    return "true" == str(self.get("metering", "metering_enabled", "false")).lower()
+
+  def get_metering_appId(self):
+    return self.get("metering", "metering_appId", "metering")
+
+  def get_metering_metrics(self):
+    return self.get("metering", "metering_metrics", "").split(',')
+
+  def get_instance_type_script(self):
+    return self.get("metering", "instance_type_script", "").split(',')
+
+  def get_provider_type(self):
+    return self.get("metering", "host_provider_type", None)
+
   def ams_monitor_log_file(self):
     """
     :returns the log file
index d161269..080eaca 100644 (file)
@@ -47,7 +47,7 @@ class Controller(threading.Thread):
     self.application_metric_map = ApplicationMetricMap(hostinfo.get_hostname(),
                                                        hostinfo.get_ip_address())
     self.event_queue = Queue(config.get_max_queue_size())
-    self.metric_collector = MetricsCollector(self.event_queue, self.application_metric_map, hostinfo)
+    self.metric_collector = MetricsCollector(self.event_queue, self.application_metric_map, hostinfo, config)
     self.sleep_interval = config.get_collector_sleep_interval()
     self._stop_handler = stop_handler
     self.initialize_events_cache()
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/instance_type_provider.py b/ambari-metrics-host-monitoring/src/main/python/core/instance_type_provider.py
new file mode 100644 (file)
index 0000000..2e0a321
--- /dev/null
@@ -0,0 +1,76 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import subprocess
+import sys
+
+logger = logging.getLogger()
+
+class HostInstanceTypeProvider:
+
+  DEFAULT_INSTANCE_TYPE = "custom"
+  KNOWN_PROVIDER_SCRIPT_PREFIX = "instance_type_provider_"
+  KNOWN_PROVIDER_SCRIPTS = dict()
+
+  def __init__(self, config):
+
+    self.KNOWN_PROVIDER_SCRIPTS['google'] = config.get_config_dir() + self.KNOWN_PROVIDER_SCRIPT_PREFIX + "gce"
+    self.KNOWN_PROVIDER_SCRIPTS['microsoft'] = config.get_config_dir() + self.KNOWN_PROVIDER_SCRIPT_PREFIX + "azure"
+    self.KNOWN_PROVIDER_SCRIPTS['xen'] = config.get_config_dir() + self.KNOWN_PROVIDER_SCRIPT_PREFIX + "ec2"
+
+    self.provider_type = config.get_provider_type()
+    logger.info("Provider type {0}".format(self.provider_type))
+
+    script = self.get_script_for_provider(self.provider_type)
+    logger.info("Script for provider {0}".format(script))
+
+    self.instance_type_script = config.get_instance_type_script()
+    logger.info("Custom Instance Type Script {0}".format(self.instance_type_script))
+
+    if script:
+      self.instance_type = self.get_instance_type_from_script(script)
+    elif self.instance_type_script:
+      self.instance_type = self.get_instance_type_from_script(self.instance_type_script)
+    else:
+      self.instance_type = self.DEFAULT_INSTANCE_TYPE
+    logger.info("Instance type {0}".format(self.instance_type))
+
+  def get_instance_type(self):
+    return self.instance_type
+
+  def get_script_for_provider(self, provider_type):
+    p_type = str(provider_type).lower()
+    if provider_type and p_type in self.KNOWN_PROVIDER_SCRIPTS:
+      return self.KNOWN_PROVIDER_SCRIPTS[p_type]
+    return None
+
+  def get_instance_type_from_script(self, script):
+    instance_type = self.DEFAULT_INSTANCE_TYPE
+    if script:
+      try:
+        osStat = subprocess.Popen([script], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+        out, err = osStat.communicate()
+        if 0 == osStat.returncode and 0 != len(out.strip()):
+          instance_type = out.strip()
+          logger.info("Read instance_type '{0}' using script '{1}'".format(instance_type, script))
+      except:
+        logger.warn("Unexpected error while retrieving instance_type: '{0}'".format(sys.exc_info()))
+    return instance_type
\ No newline at end of file
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/metering.py b/ambari-metrics-host-monitoring/src/main/python/core/metering.py
new file mode 100644 (file)
index 0000000..bac71e6
--- /dev/null
@@ -0,0 +1,63 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import time
+import json
+from instance_type_provider import HostInstanceTypeProvider
+
+logger = logging.getLogger()
+
+class MeteringMetricHandler:
+
+  METERING_ALIVE_TIME_METRIC_SUFFIX = "lastKnownAliveTime"
+
+  ## At startup,
+  def __init__(self, config):
+    self.appId = config.get_metering_appId()
+    self.instance_type_metric_appId = config.get_metering_appId() + "_instance_type"
+    self.metering_enabled = config.is_metering_enabled()
+    self.hostname = config.get_hostname_config()
+    self.instance_id = config.get_instanceid()
+    self.metering_metric_list = config.get_metering_metrics()
+    self.start_ts = int(round(time.time() * 1000))
+    if self.metering_enabled:
+      logger.info("Metering started with: appId = {0}, metering_metric_list = {1}, start time key = {2}"
+                   .format(self.appId, self.metering_metric_list, self.start_ts))
+      self.instance_type_provider = HostInstanceTypeProvider(config)
+      self.instance_type = self.instance_type_provider.instance_type
+      self.metering_metric_key_prefix = self.hostname + "~" + self.instance_type + "~" + str(self.start_ts)
+    pass
+
+  # Metering Metrics
+  def get_metering_metrics(self, metrics):
+    metering_metrics = {}
+    curr_time = int(round(time.time() * 1000))
+    for metric_name, value in metrics.iteritems():
+      if metric_name in self.metering_metric_list:
+        end_time_metric_key = self.metering_metric_key_prefix + "~" + metric_name + "~" + str(value) + "~" + self.METERING_ALIVE_TIME_METRIC_SUFFIX
+        metering_metrics[end_time_metric_key] = curr_time
+
+    return metering_metrics
+
+    # Instance Type Metrics
+  def get_instance_type_metrics(self):
+    metering_metrics = {self.instance_type: 1}
+    return metering_metrics
\ No newline at end of file
index 84a4d76..f728c2d 100644 (file)
@@ -20,8 +20,8 @@ limitations under the License.
 
 import logging
 from time import time
-from host_info import HostInfo
 from event_definition import HostMetricCollectEvent, ProcessMetricCollectEvent
+from metering import MeteringMetricHandler
 
 logger = logging.getLogger()
 
@@ -34,10 +34,12 @@ class MetricsCollector():
   not required if Timer class is used for metric groups.
   """
 
-  def __init__(self, emit_queue, application_metric_map, host_info):
+  def __init__(self, emit_queue, application_metric_map, host_info, config):
     self.emit_queue = emit_queue
     self.application_metric_map = application_metric_map
     self.host_info = host_info
+    self.metering_enabled = config.is_metering_enabled()
+    self.metering_handler = MeteringMetricHandler(config)
   pass
 
   def process_event(self, event):
@@ -86,6 +88,12 @@ class MetricsCollector():
 
     if metrics:
       self.application_metric_map.put_metric(DEFAULT_HOST_APP_ID, metrics, startTime)
+      if self.metering_enabled:
+        metering_metrics = self.metering_handler.get_metering_metrics(metrics)
+        self.application_metric_map.put_metric(self.metering_handler.appId, metering_metrics, startTime)
+
+        instance_type_metrics = self.metering_handler.get_instance_type_metrics()
+        self.application_metric_map.put_metric(self.metering_handler.instance_type_metric_appId, instance_type_metrics, startTime)
     pass
 
   def process_process_collection_event(self, event):
index 11a85eb..64df3e6 100644 (file)
@@ -25,6 +25,7 @@ from core.application_metric_map import ApplicationMetricMap
 from core.metric_collector import MetricsCollector
 from core.event_definition import HostMetricCollectEvent
 from core.host_info import HostInfo
+from core.config_reader import Configuration
 
 logger = logging.getLogger()
 
@@ -36,7 +37,7 @@ class TestMetricCollector(TestCase):
     amm_mock.return_value = None
     host_info_mock.return_value = {'metric_name' : 'metric_value'}
 
-    metric_collector = MetricsCollector(None, amm_mock, host_info_mock)
+    metric_collector = MetricsCollector(None, amm_mock, host_info_mock, Configuration())
 
     group_config = {'collect_every' : 1, 'metrics' : 'cpu'}
     
index fd03d6e..31dc255 100644 (file)
@@ -651,4 +651,5 @@ yarn.QueueMetrics.Queue=root.running_60
 yarn.TimelineDataManagerMetrics.GetEntitiesTimeAvgTime
 yarn.TimelineDataManagerMetrics.GetEntitiesTotal
 yarn.TimelineDataManagerMetrics.PostEntitiesTimeAvgTime
-yarn.TimelineDataManagerMetrics.PostEntitiesTotal
\ No newline at end of file
+yarn.TimelineDataManagerMetrics.PostEntitiesTotal
+._p_*lastKnownAliveTime
\ No newline at end of file
index 9753f89..44e714b 100644 (file)
@@ -81,7 +81,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
 
   @Override
   protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException {
-    Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime);
+    Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime - 1000l);
 
     LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
     hBaseAccessor.saveClusterAggregateRecordsSecond(hostAggregateMap, outputTableName);
index a9ee385..974dbb9 100644 (file)
@@ -63,7 +63,7 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
   @Override
   protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException {
 
-    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime);
+    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime - 1000l);
 
     LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
     hBaseAccessor.saveHostAggregateRecords(hostAggregateMap, outputTableName);
index 9c255e7..4a9a6be 100644 (file)
@@ -74,7 +74,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
      */
 
     condition.setStatement(String.format(GET_AGGREGATED_APP_METRIC_GROUPBY_SQL,
-      outputTableName, endTime, aggregateColumnName, tableName,
+      outputTableName, endTime - 1000l, aggregateColumnName, tableName,
       getDownsampledMetricSkipClause(), startTime, endTime));
 
     if (LOG.isDebugEnabled()) {
index 72d7424..1077ff8 100644 (file)
@@ -83,7 +83,7 @@ public class TimelineMetricFilteringHostAggregator extends TimelineMetricHostAgg
     condition.setDoUpdate(true);
 
     condition.setStatement(String.format(GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL,
-      outputTableName, endTime, tableName,
+      outputTableName, endTime - 1000l, tableName,
       getDownsampledMetricSkipClause() + getIncludedUuidsClause(uuids), startTime, endTime));
 
     if (LOG.isDebugEnabled()) {
index f8757a4..5779602 100644 (file)
@@ -65,7 +65,7 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
     condition.setDoUpdate(true);
 
     condition.setStatement(String.format(GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL,
-      outputTableName, endTime, tableName,
+      outputTableName, endTime - 1000l, tableName,
       getDownsampledMetricSkipClause(), startTime, endTime));
 
     if (LOG.isDebugEnabled()) {
index 092da51..3a98804 100644 (file)
@@ -389,7 +389,7 @@ public class PhoenixTransactSQL {
     "INTO %s (UUID, SERVER_TIME, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) " +
     "SELECT UUID, %s AS SERVER_TIME, " +
     "SUM(METRIC_SUM), SUM(METRIC_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN) " +
-    "FROM %s WHERE%s SERVER_TIME > %s AND SERVER_TIME <= %s GROUP BY UUID";
+    "FROM %s WHERE%s SERVER_TIME >= %s AND SERVER_TIME < %s GROUP BY UUID";
 
   /**
    * Downsample host metrics.
@@ -407,8 +407,8 @@ public class PhoenixTransactSQL {
    */
   public static final String GET_AGGREGATED_APP_METRIC_GROUPBY_SQL = "UPSERT " +
          "INTO %s (UUID, SERVER_TIME, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) SELECT UUID, %s AS SERVER_TIME, " +
-         "ROUND(AVG(METRIC_SUM),2), ROUND(AVG(%s)), MAX(METRIC_MAX), MIN(METRIC_MIN) FROM %s WHERE%s SERVER_TIME > %s AND " +
-         "SERVER_TIME <= %s GROUP BY UUID";
+         "ROUND(AVG(METRIC_SUM),2), ROUND(AVG(%s)), MAX(METRIC_MAX), MIN(METRIC_MIN) FROM %s WHERE%s SERVER_TIME >= %s AND " +
+         "SERVER_TIME < %s GROUP BY UUID";
 
   /**
    * Downsample cluster metrics.
index 08c06a9..6d0749f 100644 (file)
@@ -304,7 +304,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
       "host", null);
 
     long endTime = startTime + 1000 * 60 * 4;
-    boolean success = aggregatorMinute.doWork(startTime - 1, endTime);
+    boolean success = aggregatorMinute.doWork(startTime - 1, endTime + 1);
     assertTrue(success);
 
     Condition condition = new DefaultCondition(uuids, metricNames, Collections.singletonList("local"), "host", null, startTime,