[GOBBLIN-1672] Refactor metrics from DagManager into its own class, add metrics per... master
authorWilliam Lo <lo.william97@gmail.com>
Tue, 9 Aug 2022 22:02:31 +0000 (15:02 -0700)
committerGitHub <noreply@github.com>
Tue, 9 Aug 2022 22:02:31 +0000 (15:02 -0700)
* Refactor metrics from DagManager into its own class, add metrics per executor for SUCCESS, FAILED, SLA_EXCEEDED, START_SLA_EXCEEDED

* Address review, fix flow gauge for failed flows

gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java [new file with mode: 0644]
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/TestServiceMetrics.java

index 3bd4083a1241faa16508bca5d634335406478676..eb8ff896c30b62550936500fecff595f8ae7017d 100644 (file)
@@ -42,6 +42,7 @@ public class ServiceMetricNames {
   public static final String RUN_IMMEDIATELY_FLOW_METER = "RunImmediatelyFlow";
   public static final String SUCCESSFUL_FLOW_METER = "SuccessfulFlows";
   public static final String START_SLA_EXCEEDED_FLOWS_METER = "StartSLAExceededFlows";
+  public static final String SLA_EXCEEDED_FLOWS_METER = "SlaExceededFlows";
   public static final String FAILED_FLOW_METER = "FailedFlows";
   public static final String SCHEDULED_FLOW_METER = GOBBLIN_SERVICE_PREFIX + ".ScheduledFlows";
   public static final String NON_SCHEDULED_FLOW_METER = GOBBLIN_SERVICE_PREFIX + ".NonScheduledFlows";
@@ -50,6 +51,7 @@ public class ServiceMetricNames {
   public static final String SERVICE_USERS = "ServiceUsers";
   public static final String COMPILED = "Compiled";
   public static final String RUNNING_STATUS = "RunningStatus";
+  public static final String JOBS_SENT_TO_SPEC_EXECUTOR = "JobsSentToSpecExecutor";
 
   public static final String HELIX_LEADER_STATE = "HelixLeaderState";
 }
index dcfbe9cdfe9b96952d058121d5528dc74ac347b1..f423277cf652c6ee373b54870f2e5a6eb51af31c 100644 (file)
@@ -18,7 +18,6 @@
 package org.apache.gobblin.service.modules.orchestration;
 
 import com.codahale.metrics.Meter;
-import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
@@ -40,9 +39,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.commons.lang3.StringUtils;
-
-import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
@@ -61,15 +57,11 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.metrics.ContextAwareCounter;
 import org.apache.gobblin.metrics.ContextAwareGauge;
-import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.RootMetricContext;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.metrics.metric.filter.MetricNameRegexFilter;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
@@ -77,8 +69,6 @@ import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.FlowId;
-import org.apache.gobblin.service.RequesterService;
-import org.apache.gobblin.service.ServiceRequester;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -186,6 +176,7 @@ public class DagManager extends AbstractIdleService {
   private final Config config;
   private final Optional<EventSubmitter> eventSubmitter;
   private final long failedDagRetentionTime;
+  private final DagManagerMetrics dagManagerMetrics;
 
   private volatile boolean isActive = false;
 
@@ -199,12 +190,14 @@ public class DagManager extends AbstractIdleService {
     this.pollingInterval = ConfigUtils.getInt(config, JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
     this.retentionPollingInterval = ConfigUtils.getInt(config, FAILED_DAG_POLLING_INTERVAL, DEFAULT_FAILED_DAG_POLLING_INTERVAL);
     this.instrumentationEnabled = instrumentationEnabled;
+    MetricContext metricContext = null;
     if (instrumentationEnabled) {
-      MetricContext metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
+      metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
       this.eventSubmitter = Optional.of(new EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
     } else {
       this.eventSubmitter = Optional.absent();
     }
+    this.dagManagerMetrics = new DagManagerMetrics(metricContext);
     TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, JOB_START_SLA_UNITS, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
     this.defaultJobStartSlaTimeMillis = jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
     this.jobStatusRetriever = jobStatusRetriever;
@@ -355,18 +348,7 @@ public class DagManager extends AbstractIdleService {
                 topologySpecMap);
         Set<String> failedDagIds = Collections.synchronizedSet(failedDagStateStore.getDagIds());
 
-        ContextAwareMeter allSuccessfulMeter = null;
-        ContextAwareMeter allFailedMeter = null;
-        // TODO: create a map of RatioGauges for success/failed per executor, will require preprocessing of available executors
-        Map<String, ContextAwareMeter> startSlaExceededMeters = Maps.newConcurrentMap();
-
-        if (instrumentationEnabled) {
-          MetricContext metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
-          allSuccessfulMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
-              ServiceMetricNames.SUCCESSFUL_FLOW_METER));
-          allFailedMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
-              ServiceMetricNames.FAILED_FLOW_METER));
-        }
+       this.dagManagerMetrics.activate();
 
         UserQuotaManager quotaManager = new UserQuotaManager(config);
         // Before initializing the DagManagerThreads check which dags are currently running before shutdown
@@ -382,8 +364,8 @@ public class DagManager extends AbstractIdleService {
         this.dagManagerThreads = new DagManagerThread[numThreads];
         for (int i = 0; i < numThreads; i++) {
           DagManagerThread dagManagerThread = new DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
-              runQueue[i], cancelQueue[i], resumeQueue[i], instrumentationEnabled, failedDagIds, allSuccessfulMeter,
-              allFailedMeter, startSlaExceededMeters, this.defaultJobStartSlaTimeMillis, quotaManager, i);
+              runQueue[i], cancelQueue[i], resumeQueue[i], instrumentationEnabled, failedDagIds, this.dagManagerMetrics,
+              this.defaultJobStartSlaTimeMillis, quotaManager, i);
           this.dagManagerThreads[i] = dagManagerThread;
           this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0, this.pollingInterval, TimeUnit.SECONDS);
         }
@@ -397,9 +379,7 @@ public class DagManager extends AbstractIdleService {
       } else { //Mark the DagManager inactive.
         log.info("Inactivating the DagManager. Shutting down all DagManager threads");
         this.scheduledExecutorPool.shutdown();
-        // The DMThread's metrics mappings follow the lifecycle of the DMThread itself and so are lost by DM deactivation-reactivation but the RootMetricContext is a (persistent) singleton.
-        // To avoid IllegalArgumentException by the RMC preventing (re-)add of a metric already known, remove all metrics that a new DMThread thread would attempt to add (in DagManagerThread::initialize) whenever running post-re-enablement
-        RootMetricContext.get().removeMatching(getMetricsFilterForDagManager());
+        this.dagManagerMetrics.cleanup();
         try {
           this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
@@ -412,11 +392,6 @@ public class DagManager extends AbstractIdleService {
     }
   }
 
-  @VisibleForTesting
-  protected static MetricNameRegexFilter getMetricsFilterForDagManager() {
-    return new MetricNameRegexFilter(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "\\..*\\." + ServiceMetricNames.RUNNING_STATUS);
-  }
-
   /**
    * Each {@link DagManagerThread} performs 2 actions when scheduled:
    * <ol>
@@ -439,12 +414,7 @@ public class DagManager extends AbstractIdleService {
     private final Optional<EventSubmitter> eventSubmitter;
     private final Optional<Timer> jobStatusPolledTimer;
     private final AtomicLong orchestrationDelay = new AtomicLong(0);
-    private static final Map<String, FlowState> flowGauges = Maps.newConcurrentMap();
-    private final ContextAwareMeter allSuccessfulMeter;
-    private final ContextAwareMeter allFailedMeter;
-    private static final Map<String, ContextAwareMeter> groupSuccessfulMeters = Maps.newConcurrentMap();
-    private static final Map<String, ContextAwareMeter> groupFailureMeters = Maps.newConcurrentMap();
-    private final Map<String, ContextAwareMeter> startSlaExceededMeters;
+    private final DagManagerMetrics dagManagerMetrics;
     private final UserQuotaManager quotaManager;
     private final JobStatusRetriever jobStatusRetriever;
     private final DagStateStore dagStateStore;
@@ -459,8 +429,7 @@ public class DagManager extends AbstractIdleService {
      */
     DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore dagStateStore, DagStateStore failedDagStateStore,
         BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String> cancelQueue, BlockingQueue<String> resumeQueue,
-        boolean instrumentationEnabled, Set<String> failedDagIds, ContextAwareMeter allSuccessfulMeter,
-        ContextAwareMeter allFailedMeter, Map<String, ContextAwareMeter> startSlaExceededMeters,
+        boolean instrumentationEnabled, Set<String> failedDagIds, DagManagerMetrics dagManagerMetrics,
         Long defaultJobStartSla, UserQuotaManager quotaManager, int dagMangerThreadId) {
       this.jobStatusRetriever = jobStatusRetriever;
       this.dagStateStore = dagStateStore;
@@ -469,11 +438,9 @@ public class DagManager extends AbstractIdleService {
       this.queue = queue;
       this.cancelQueue = cancelQueue;
       this.resumeQueue = resumeQueue;
-      this.allSuccessfulMeter = allSuccessfulMeter;
-      this.allFailedMeter = allFailedMeter;
+      this.dagManagerMetrics = dagManagerMetrics;
       this.defaultJobStartSlaTimeMillis = defaultJobStartSla;
       this.quotaManager = quotaManager;
-      this.startSlaExceededMeters = startSlaExceededMeters;
 
       if (instrumentationEnabled) {
         this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
@@ -673,22 +640,13 @@ public class DagManager extends AbstractIdleService {
         if (DagManagerUtils.getExecutionStatus(dagNode) == RUNNING) {
           addJobState(dagId, dagNode);
           //Update the running jobs counter.
-          getRunningJobsCounter(dagNode).inc();
-          getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::inc);
+          dagManagerMetrics.incrementRunningJobMetrics(dagNode);
           isDagRunning = true;
         }
       }
 
       FlowId flowId = DagManagerUtils.getFlowId(dag);
-      // Do not register flow-specific metrics for a flow
-      if (!flowGauges.containsKey(flowId.toString()) && DagManagerUtils.shouldFlowOutputMetrics(dag)) {
-        String flowStateGaugeName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, flowId.getFlowGroup(),
-            flowId.getFlowName(), ServiceMetricNames.RUNNING_STATUS);
-        flowGauges.put(flowId.toString(), FlowState.RUNNING);
-        ContextAwareGauge<Integer> gauge = RootMetricContext
-            .get().newContextAwareGauge(flowStateGaugeName, () -> flowGauges.get(flowId.toString()).value);
-        RootMetricContext.get().register(flowStateGaugeName, gauge);
-      }
+      this.dagManagerMetrics.registerFlowMetric(flowId, dag);
 
       log.debug("Dag {} submitting jobs ready for execution.", DagManagerUtils.getFullyQualifiedDagName(dag));
       //Determine the next set of jobs to run and submit them for execution
@@ -699,7 +657,7 @@ public class DagManager extends AbstractIdleService {
 
       // Set flow status to running
       DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag, TimingEvent.FlowTimings.FLOW_RUNNING);
-      conditionallyUpdateFlowGaugeExecutionState(flowGauges, flowId, FlowState.RUNNING);
+      dagManagerMetrics.conditionallyMarkFlowAsState(flowId, FlowState.RUNNING);
 
       // Report the orchestration delay the first time the Dag is initialized. Orchestration delay is defined as
       // the time difference between the instant when a flow first transitions to the running state and the instant
@@ -805,14 +763,12 @@ public class DagManager extends AbstractIdleService {
             DagManagerUtils.getJobName(node),
             DagManagerUtils.getFullyQualifiedDagName(node),
             timeOutForJobStart);
+        dagManagerMetrics.incrementCountsStartSlaExceeded(node);
         cancelDagNode(node);
 
         String dagId = DagManagerUtils.generateDagId(node);
         this.dags.get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
         this.dags.get(dagId).setMessage("Flow killed because no update received for " + timeOutForJobStart + " ms after orchestration");
-        if (this.metricContext != null) {
-          this.getExecutorMeterForDag(node, ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER, startSlaExceededMeters).mark();
-        }
         return true;
       } else {
         return false;
@@ -865,6 +821,7 @@ public class DagManager extends AbstractIdleService {
         log.info("Flow {} exceeded the SLA of {} ms. Killing the job {} now...",
             node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), flowSla,
             node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
+        dagManagerMetrics.incrementExecutorSlaExceeded(node);
         cancelDagNode(node);
 
         this.dags.get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);
@@ -964,9 +921,8 @@ public class DagManager extends AbstractIdleService {
         // Quota release is guaranteed, despite failure, because exception handling within would mark the job FAILED.
         // When the ensuing kafka message spurs DagManager processing, the quota is released and the counts decremented
         // Ensure that we do not double increment for flows that are retried
-        if (this.metricContext != null && dagNode.getValue().getCurrentAttempts() == 1) {
-          getRunningJobsCounter(dagNode).inc();
-          getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::inc);
+        if (dagNode.getValue().getCurrentAttempts() == 1) {
+          dagManagerMetrics.incrementRunningJobMetrics(dagNode);
         }
         // Submit the job to the SpecProducer, which in turn performs the actual job submission to the SpecExecutor instance.
         // The SpecProducer implementations submit the job to the underlying executor and return when the submission is complete,
@@ -985,6 +941,7 @@ public class DagManager extends AbstractIdleService {
           jobOrchestrationTimer.stop(jobMetadata);
         }
         log.info("Orchestrated job: {} on Executor: {}", DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
+        this.dagManagerMetrics.incrementJobsSentToExecutor(dagNode);
       } catch (Exception e) {
         TimingEvent jobFailedTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get().
             getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED) : null;
@@ -1012,9 +969,8 @@ public class DagManager extends AbstractIdleService {
       ExecutionStatus jobStatus = DagManagerUtils.getExecutionStatus(dagNode);
       log.info("Job {} of Dag {} has finished with status {}", jobName, dagId, jobStatus.name());
       // Only decrement counters and quota for jobs that actually ran on the executor, not from a GaaS side failure/skip event
-      if (quotaManager.releaseQuota(dagNode) && this.metricContext != null) {
-        getRunningJobsCounter(dagNode).dec();
-        getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::dec);
+      if (quotaManager.releaseQuota(dagNode)) {
+        dagManagerMetrics.decrementRunningJobMetrics(dagNode);
       }
 
       switch (jobStatus) {
@@ -1025,6 +981,7 @@ public class DagManager extends AbstractIdleService {
           } else {
             this.failedDagIdsFinishAllPossible.add(dagId);
           }
+          dagManagerMetrics.incrementExecutorFailed(dagNode);
           return Maps.newHashMap();
         case CANCELLED:
           if (DagManagerUtils.getFailureOption(dag) == FailureOption.FINISH_RUNNING) {
@@ -1034,6 +991,7 @@ public class DagManager extends AbstractIdleService {
           }
           return Maps.newHashMap();
         case COMPLETE:
+          dagManagerMetrics.incrementExecutorSuccess(dagNode);
           return submitNext(dagId);
         default:
           log.warn("It should not reach here. Job status is unexpected.");
@@ -1064,61 +1022,6 @@ public class DagManager extends AbstractIdleService {
       return dagNodes != null && !dagNodes.isEmpty();
     }
 
-    private ContextAwareCounter getRunningJobsCounter(DagNode<JobExecutionPlan> dagNode) {
-      return metricContext.contextAwareCounter(
-          MetricRegistry.name(
-              ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
-              ServiceMetricNames.RUNNING_FLOWS_COUNTER,
-              DagManagerUtils.getSpecExecutorName(dagNode)));
-    }
-
-    private List<ContextAwareCounter> getRunningJobsCounterForUser(DagNode<JobExecutionPlan> dagNode) {
-      Config configs = dagNode.getValue().getJobSpec().getConfig();
-      String proxy = ConfigUtils.getString(configs, AzkabanProjectConfig.USER_TO_PROXY, null);
-      List<ContextAwareCounter> counters = new ArrayList<>();
-
-      if (StringUtils.isNotEmpty(proxy)) {
-        counters.add(metricContext.contextAwareCounter(
-            MetricRegistry.name(
-                ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
-                ServiceMetricNames.SERVICE_USERS, proxy)));
-      }
-
-      try {
-        String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
-        if (StringUtils.isNotEmpty(serializedRequesters)) {
-          List<ServiceRequester> requesters = RequesterService.deserialize(serializedRequesters);
-          for (ServiceRequester requester : requesters) {
-            counters.add(metricContext.contextAwareCounter(MetricRegistry
-                .name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.SERVICE_USERS, requester.getName())));
-          }
-        }
-      } catch (IOException e) {
-        log.error("Error while fetching requester list.", e);
-      }
-
-      return counters;
-    }
-
-    private ContextAwareMeter getGroupMeterForDag(String dagId, String meterName, Map<String, ContextAwareMeter> meterMap) {
-      String flowGroup = DagManagerUtils.getFlowId(this.dags.get(dagId)).getFlowGroup();
-      return meterMap.computeIfAbsent(flowGroup,
-          group -> metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, group, meterName)));
-    }
-
-    /**
-     * Used to track metrics for different specExecutors to detect issues with the specExecutor itself
-     * @param dagNode
-     * @param meterName
-     * @param meterMap
-     * @return
-     */
-    private ContextAwareMeter getExecutorMeterForDag(DagNode<JobExecutionPlan> dagNode, String meterName, Map<String, ContextAwareMeter> meterMap) {
-      String executorName = DagManagerUtils.getSpecExecutorName(dagNode);
-      return meterMap.computeIfAbsent(executorName,
-          executorUri -> metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, executorUri, meterName)));
-    }
-
     /**
      * Perform clean up. Remove a dag from the dagstore if the dag is complete and update internal state.
      */
@@ -1132,24 +1035,27 @@ public class DagManager extends AbstractIdleService {
           DagNode<JobExecutionPlan> dagNode = dagNodeList.poll();
           deleteJobState(dagId, dagNode);
         }
-        log.info("Dag {} has finished with status FAILED; Cleaning up dag from the state store.", dagId);
-        onFlowFailure(dagId);
+        Dag<JobExecutionPlan> dag = this.dags.get(dagId);
+        String status = TimingEvent.FlowTimings.FLOW_FAILED;
+        addFailedDag(dagId, dag);
+        log.info("Dag {} has finished with status {}; Cleaning up dag from the state store.", dagId, status);
         // send an event before cleaning up dag
-        DagManagerUtils.emitFlowEvent(this.eventSubmitter, this.dags.get(dagId), TimingEvent.FlowTimings.FLOW_FAILED);
+        DagManagerUtils.emitFlowEvent(this.eventSubmitter, this.dags.get(dagId), status);
         dagIdstoClean.add(dagId);
       }
 
-      //Clean up completed dags
-      for (String dagId : this.dags.keySet()) {
+      // Remove dags that are finished and emit their appropriate metrics
+      for (Map.Entry<String, Dag<JobExecutionPlan>> dagIdKeyPair : this.dags.entrySet()) {
+        String dagId = dagIdKeyPair.getKey();
+        Dag<JobExecutionPlan> dag = dagIdKeyPair.getValue();
         if (!hasRunningJobs(dagId) && !this.failedDagIdsFinishRunning.contains(dagId)) {
           String status = TimingEvent.FlowTimings.FLOW_SUCCEEDED;
           if (this.failedDagIdsFinishAllPossible.contains(dagId)) {
-            onFlowFailure(dagId);
             status = TimingEvent.FlowTimings.FLOW_FAILED;
+            addFailedDag(dagId, dag);
             this.failedDagIdsFinishAllPossible.remove(dagId);
-            conditionallyUpdateFlowGaugeExecutionState(flowGauges, DagManagerUtils.getFlowId(this.dags.get(dagId)), FlowState.FAILED);
           } else {
-            onFlowSuccess(dagId);
+            dagManagerMetrics.emitFlowSuccessMetrics(DagManagerUtils.getFlowId(this.dags.get(dagId)));
           }
           log.info("Dag {} has finished with status {}; Cleaning up dag from the state store.", dagId, status);
           // send an event before cleaning up dag
@@ -1163,27 +1069,11 @@ public class DagManager extends AbstractIdleService {
       }
     }
 
-    private void onFlowSuccess(String dagId) {
-      if (this.metricContext != null) {
-        conditionallyUpdateFlowGaugeExecutionState(flowGauges, DagManagerUtils.getFlowId(this.dags.get(dagId)), FlowState.SUCCESSFUL);
-        this.allSuccessfulMeter.mark();
-        getGroupMeterForDag(dagId, ServiceMetricNames.SUCCESSFUL_FLOW_METER, groupSuccessfulMeters).mark();
-      }
-    }
-
-    private void onFlowFailure(String dagId) {
-      addFailedDag(dagId);
-      if (this.metricContext != null) {
-        conditionallyUpdateFlowGaugeExecutionState(flowGauges, DagManagerUtils.getFlowId(this.dags.get(dagId)), FlowState.FAILED);
-        this.allFailedMeter.mark();
-        getGroupMeterForDag(dagId, ServiceMetricNames.FAILED_FLOW_METER, groupFailureMeters).mark();
-      }
-    }
-
     /**
      * Add a dag to failed dag state store
      */
-    private synchronized void addFailedDag(String dagId) {
+    private synchronized void addFailedDag(String dagId, Dag<JobExecutionPlan> dag) {
+      FlowId flowId = DagManagerUtils.getFlowId(dag);
       try {
         log.info("Adding dag " + dagId + " to failed dag state store");
         this.failedDagStateStore.writeCheckpoint(this.dags.get(dagId));
@@ -1191,6 +1081,12 @@ public class DagManager extends AbstractIdleService {
         log.error("Failed to add dag " + dagId + " to failed dag state store", e);
       }
       this.failedDagIds.add(dagId);
+      if (TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED.equals(dag.getFlowEvent())) {
+        this.dagManagerMetrics.emitFlowSlaExceededMetrics(flowId);
+      } else if (!TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED.equals(dag.getFlowEvent())) {
+        dagManagerMetrics.emitFlowFailedMetrics(flowId);
+      }
+      this.dagManagerMetrics.conditionallyMarkFlowAsState(flowId, DagManager.FlowState.FAILED);
     }
 
     /**
@@ -1210,18 +1106,6 @@ public class DagManager extends AbstractIdleService {
       this.dags.remove(dagId);
       this.dagToJobs.remove(dagId);
     }
-
-    /**
-     * Updates flowGauges with the appropriate state if the gauge is being tracked for the flow
-      * @param flowGauges
-     * @param flowId
-     * @param state
-     */
-    private void conditionallyUpdateFlowGaugeExecutionState(Map<String, FlowState> flowGauges, FlowId flowId, FlowState state) {
-      if (flowGauges.containsKey(flowId.toString())) {
-        flowGauges.put(flowId.toString(), state);
-      }
-    }
   }
 
   public enum FlowState {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java
new file mode 100644 (file)
index 0000000..acf7c03
--- /dev/null
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.ContextAwareCounter;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.metric.filter.MetricNameRegexFilter;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceRequester;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@Slf4j
+public class DagManagerMetrics {
+  private static final Map<String, DagManager.FlowState> flowGauges = Maps.newConcurrentMap();
+  // Meters representing the total number of flows in a given state
+  private ContextAwareMeter allSuccessfulMeter;
+  private ContextAwareMeter allFailedMeter;
+  private ContextAwareMeter allRunningMeter;
+  private ContextAwareMeter allSlaExceededMeter;
+  private ContextAwareMeter allStartSlaExceededMeter;
+  // Meters representing the flows in a given state per flowgroup
+  private final Map<String, ContextAwareMeter> groupSuccessfulMeters = Maps.newConcurrentMap();
+  private final Map<String, ContextAwareMeter> groupFailureMeters = Maps.newConcurrentMap();
+  private final Map<String, ContextAwareMeter> groupStartSlaExceededMeters = Maps.newConcurrentMap();
+  private final Map<String, ContextAwareMeter> groupSlaExceededMeters = Maps.newConcurrentMap();
+
+  // Meters representing the jobs in a given state per executor
+  // These metrics need to be invoked differently to account for automated retries and multihop scenarios.
+  private final Map<String, ContextAwareMeter> executorSuccessMeters = Maps.newConcurrentMap();
+  private final Map<String, ContextAwareMeter> executorFailureMeters = Maps.newConcurrentMap();
+  private final Map<String, ContextAwareMeter> executorStartSlaExceededMeters = Maps.newConcurrentMap();
+  private final Map<String, ContextAwareMeter> executorSlaExceededMeters = Maps.newConcurrentMap();
+  private final Map<String, ContextAwareMeter> executorJobSentMeters = Maps.newConcurrentMap();
+  MetricContext metricContext;
+
+  public DagManagerMetrics(MetricContext metricContext) {
+    this.metricContext = metricContext;
+  }
+
+  public void activate() {
+    if (this.metricContext != null) {
+      allSuccessfulMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+          ServiceMetricNames.SUCCESSFUL_FLOW_METER));
+      allFailedMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+          ServiceMetricNames.FAILED_FLOW_METER));
+      allStartSlaExceededMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+          ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER));
+      allSlaExceededMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+          ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER));
+      allRunningMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+          ServiceMetricNames.JOBS_SENT_TO_SPEC_EXECUTOR));
+    }
+  }
+
+  public void registerFlowMetric(FlowId flowId, Dag<JobExecutionPlan> dag) {
+    // Do not register flow-specific metrics for an adhoc flow
+    if (!flowGauges.containsKey(flowId.toString()) && DagManagerUtils.shouldFlowOutputMetrics(dag)) {
+      String flowStateGaugeName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, flowId.getFlowGroup(),
+          flowId.getFlowName(), ServiceMetricNames.RUNNING_STATUS);
+      flowGauges.put(flowId.toString(), DagManager.FlowState.RUNNING);
+      ContextAwareGauge<Integer> gauge = RootMetricContext
+          .get().newContextAwareGauge(flowStateGaugeName, () -> flowGauges.get(flowId.toString()).value);
+      RootMetricContext.get().register(flowStateGaugeName, gauge);
+    }
+  }
+
+  public void incrementRunningJobMetrics(Dag.DagNode<JobExecutionPlan> dagNode) {
+    if (this.metricContext != null) {
+      this.getRunningJobsCounterForExecutor(dagNode).inc();
+      this.getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::inc);
+    }
+  }
+
+  public void decrementRunningJobMetrics(Dag.DagNode<JobExecutionPlan> dagNode) {
+    if (this.metricContext != null) {
+      this.getRunningJobsCounterForExecutor(dagNode).dec();
+      this.getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::dec);
+    }
+  }
+
+  /**
+   * Updates flowGauges with the appropriate state if the gauge is being tracked for the flow
+   * @param flowId
+   * @param state
+   */
+  public void conditionallyMarkFlowAsState(FlowId flowId, DagManager.FlowState state) {
+    if (flowGauges.containsKey(flowId.toString())) {
+      flowGauges.put(flowId.toString(), state);
+    }
+  }
+
+  public void emitFlowSuccessMetrics(FlowId flowId) {
+    if (this.metricContext != null) {
+      this.conditionallyMarkFlowAsState(flowId, DagManager.FlowState.SUCCESSFUL);
+      this.allSuccessfulMeter.mark();
+      this.getGroupMeterForDag(flowId.getFlowGroup(), ServiceMetricNames.SUCCESSFUL_FLOW_METER, groupSuccessfulMeters).mark();
+    }
+  }
+
+  public void emitFlowFailedMetrics(FlowId flowId) {
+    if (this.metricContext != null) {
+      this.conditionallyMarkFlowAsState(flowId, DagManager.FlowState.FAILED);
+      this.allFailedMeter.mark();
+      this.getGroupMeterForDag(flowId.getFlowGroup(), ServiceMetricNames.FAILED_FLOW_METER, groupFailureMeters).mark();
+    }
+  }
+
+  public void emitFlowSlaExceededMetrics(FlowId flowId) {
+    if (this.metricContext != null) {
+      this.conditionallyMarkFlowAsState(flowId, DagManager.FlowState.FAILED);
+      this.allSlaExceededMeter.mark();
+      this.getGroupMeterForDag(flowId.getFlowGroup(), ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER, groupSlaExceededMeters).mark();
+    }
+  }
+
+  public void incrementExecutorSuccess(Dag.DagNode<JobExecutionPlan> node) {
+    if (this.metricContext != null) {
+      this.getExecutorMeterForDag(node, ServiceMetricNames.SUCCESSFUL_FLOW_METER, executorSuccessMeters).mark();
+    }
+  }
+
+  public void incrementExecutorFailed(Dag.DagNode<JobExecutionPlan> node) {
+    if (this.metricContext != null) {
+      this.getExecutorMeterForDag(node, ServiceMetricNames.FAILED_FLOW_METER, executorFailureMeters).mark();
+    }
+  }
+
+  public void incrementExecutorSlaExceeded(Dag.DagNode<JobExecutionPlan> node) {
+    if (this.metricContext != null) {
+      this.getExecutorMeterForDag(node, ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER, executorSlaExceededMeters).mark();
+    }
+  }
+
+  public void incrementJobsSentToExecutor(Dag.DagNode<JobExecutionPlan> node) {
+    if (this.metricContext != null) {
+      this.getExecutorMeterForDag(node, ServiceMetricNames.JOBS_SENT_TO_SPEC_EXECUTOR, executorJobSentMeters).mark();
+      this.allRunningMeter.mark();
+    }
+  }
+
+  // Increment the counts for start sla during the flow submission rather than cleanup to account for retries obfuscating this metric
+  public void incrementCountsStartSlaExceeded(Dag.DagNode<JobExecutionPlan> node) {
+    String flowGroup = node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    if (this.metricContext != null) {
+      this.getGroupMeterForDag(flowGroup, ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER, groupStartSlaExceededMeters);
+      this.allStartSlaExceededMeter.mark();
+      this.getExecutorMeterForDag(node, ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER, executorStartSlaExceededMeters).mark();
+    }
+  }
+
+  private List<ContextAwareCounter> getRunningJobsCounterForUser(Dag.DagNode<JobExecutionPlan> dagNode) {
+    Config configs = dagNode.getValue().getJobSpec().getConfig();
+    String proxy = ConfigUtils.getString(configs, AzkabanProjectConfig.USER_TO_PROXY, null);
+    List<ContextAwareCounter> counters = new ArrayList<>();
+
+    if (StringUtils.isNotEmpty(proxy)) {
+      counters.add(this.metricContext.contextAwareCounter(
+          MetricRegistry.name(
+              ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+              ServiceMetricNames.SERVICE_USERS, proxy)));
+    }
+
+    try {
+      String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
+      if (StringUtils.isNotEmpty(serializedRequesters)) {
+        List<ServiceRequester> requesters = RequesterService.deserialize(serializedRequesters);
+        for (ServiceRequester requester : requesters) {
+          counters.add(this.metricContext.contextAwareCounter(MetricRegistry
+              .name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.SERVICE_USERS, requester.getName())));
+        }
+      }
+    } catch (IOException e) {
+      log.error("Error while fetching requester list.", e);
+    }
+
+    return counters;
+  }
+
+  private ContextAwareCounter getRunningJobsCounterForExecutor(Dag.DagNode<JobExecutionPlan> dagNode) {
+    return this.metricContext.contextAwareCounter(
+        MetricRegistry.name(
+            ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+            DagManagerUtils.getSpecExecutorName(dagNode),
+            ServiceMetricNames.RUNNING_FLOWS_COUNTER));
+  }
+
+
+  private ContextAwareMeter getGroupMeterForDag(String flowGroup, String meterName, Map<String, ContextAwareMeter> meterMap) {
+    return meterMap.computeIfAbsent(flowGroup,
+        group -> metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, group, meterName)));
+  }
+
+  /**
+   * Used to track metrics for different specExecutors to detect issues with the specExecutor itself
+   * @param dagNode
+   * @param meterName
+   * @param meterMap
+   * @return
+   */
+  private ContextAwareMeter getExecutorMeterForDag(Dag.DagNode<JobExecutionPlan> dagNode, String meterName, Map<String, ContextAwareMeter> meterMap) {
+    String executorName = DagManagerUtils.getSpecExecutorName(dagNode);
+    return meterMap.computeIfAbsent(executorName,
+        executorUri -> metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, executorUri, meterName)));
+  }
+
+
+  @VisibleForTesting
+  protected static MetricNameRegexFilter getMetricsFilterForDagManager() {
+    return new MetricNameRegexFilter(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "\\..*\\." + ServiceMetricNames.RUNNING_STATUS);
+  }
+
+  public void cleanup() {
+    // The DMThread's metrics mappings follow the lifecycle of the DMThread itself and so are lost by DM deactivation-reactivation but the RootMetricContext is a (persistent) singleton.
+    // To avoid IllegalArgumentException by the RMC preventing (re-)add of a metric already known, remove all metrics that a new DMThread thread would attempt to add (in DagManagerThread::initialize) whenever running post-re-enablement
+    RootMetricContext.get().removeMatching(getMetricsFilterForDagManager());
+  }
+}
index 216e28525dea90153645dc61707f39944d853533..b414a76e690a68cf2dbfa6ec04e2b87a90035e60 100644 (file)
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -71,6 +72,7 @@ public class DagManagerTest {
   private DagStateStore _dagStateStore;
   private DagStateStore _failedDagStateStore;
   private JobStatusRetriever _jobStatusRetriever;
+  private DagManagerMetrics _dagManagerMetrics;
   private DagManager.DagManagerThread _dagManagerThread;
   private LinkedBlockingQueue<Dag<JobExecutionPlan>> queue;
   private LinkedBlockingQueue<String> cancelQueue;
@@ -99,9 +101,10 @@ public class DagManagerTest {
     Config quotaConfig = ConfigFactory.empty()
         .withValue(UserQuotaManager.PER_USER_QUOTA, ConfigValueFactory.fromAnyRef("user:1"));
     this._gobblinServiceQuotaManager = new UserQuotaManager(quotaConfig);
+    this._dagManagerMetrics = new DagManagerMetrics(metricContext);
+    this._dagManagerMetrics.activate();
     this._dagManagerThread = new DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, _failedDagStateStore, queue, cancelQueue,
-        resumeQueue, true, new HashSet<>(), metricContext.contextAwareMeter("successMeter"),
-        metricContext.contextAwareMeter("failedMeter"), new HashMap<>(), START_SLA_DEFAULT, _gobblinServiceQuotaManager, 0);
+        resumeQueue, true, new HashSet<>(), this._dagManagerMetrics, START_SLA_DEFAULT, _gobblinServiceQuotaManager, 0);
 
     Field jobToDagField = DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
     jobToDagField.setAccessible(true);
@@ -703,15 +706,15 @@ public class DagManagerTest {
     // The start time should be 16 minutes ago, which is past the start SLA so the job should be cancelled
     Iterator<JobStatus> jobStatusIterator1 =
         getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0, flowGroup, String.valueOf(ExecutionStatus.ORCHESTRATED),
-            false, flowExecutionId - 16 * 60 * 1000);
+            false, flowExecutionId - Duration.ofMinutes(16).toMillis());
     // This is for the second Dag that does not match the SLA so should schedule normally
     Iterator<JobStatus> jobStatusIterator2 =
         getMockJobStatus(flowName1, flowGroup1, flowExecutionId+1, jobName0, flowGroup1, String.valueOf(ExecutionStatus.ORCHESTRATED),
-            false, flowExecutionId - 10 * 60 * 1000);
+            false, flowExecutionId - Duration.ofMinutes(10).toMillis());
     // Let the first job get reported as cancel due to SLA kill on start and clean up
     Iterator<JobStatus> jobStatusIterator3 =
         getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0, flowGroup, String.valueOf(ExecutionStatus.CANCELLED),
-            false, flowExecutionId - 16 * 60 * 1000);
+            false, flowExecutionId - Duration.ofMinutes(16).toMillis());
     // Cleanup the running job that is scheduled normally
     Iterator<JobStatus> jobStatusIterator4 =
         getMockJobStatus(flowName1, flowGroup1, flowExecutionId+1, jobName0, flowGroup1, String.valueOf(ExecutionStatus.COMPLETE));
@@ -751,6 +754,8 @@ public class DagManagerTest {
   @Test (dependsOnMethods = "testJobStartSLAKilledDag")
   public void testJobKilledSLAMetricsArePerExecutor() throws URISyntaxException, IOException {
     long flowExecutionId = System.currentTimeMillis();
+    // The start time should be 16 minutes ago, which is past the start SLA so the job should be cancelled
+    long startOrchestrationTime = flowExecutionId - Duration.ofMinutes(16).toMillis();
     Config executorOneConfig = ConfigFactory.empty()
         .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef("executorOne"))
         .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(flowExecutionId));
@@ -758,20 +763,23 @@ public class DagManagerTest {
     List<Dag<JobExecutionPlan>> dagList = buildDagList(2, "user", executorOneConfig);
     dagList.add(buildDag("2", flowExecutionId, "FINISH_RUNNING", 1, "user", executorTwoConfig));
 
+    String allSlaKilledMeterName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,  ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER);
+    long previousSlaKilledCount = metricContext.getParent().get().getMeters().get(allSlaKilledMeterName) == null ? 0 :
+        metricContext.getParent().get().getMeters().get(allSlaKilledMeterName).getCount();
+
     //Add a dag to the queue of dags
     this.queue.offer(dagList.get(0));
     this.queue.offer(dagList.get(1));
     this.queue.offer(dagList.get(2));;
-    // The start time should be 16 minutes ago, which is past the start SLA so the job should be cancelled
     Iterator<JobStatus> jobStatusIterator1 =
         getMockJobStatus("flow0", "group0", flowExecutionId, "job0", "group0", String.valueOf(ExecutionStatus.ORCHESTRATED),
-            false, flowExecutionId - 16 * 60 * 1000);
+            false, startOrchestrationTime);
     Iterator<JobStatus> jobStatusIterator2 =
         getMockJobStatus("flow1", "flow1", flowExecutionId+1, "job0", "group1", String.valueOf(ExecutionStatus.ORCHESTRATED),
-            false, flowExecutionId - 16 * 60 * 1000);
+            false, startOrchestrationTime);
     Iterator<JobStatus> jobStatusIterator3 =
         getMockJobStatus("flow2", "flow2", flowExecutionId+1, "job0", "group2", String.valueOf(ExecutionStatus.ORCHESTRATED),
-            false, flowExecutionId - 16 * 60 * 1000);
+            false, startOrchestrationTime);
 
     Mockito.when(_jobStatusRetriever
         .getJobStatusesForFlowExecution(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).
@@ -779,15 +787,18 @@ public class DagManagerTest {
         thenReturn(jobStatusIterator2).
         thenReturn(jobStatusIterator3);
 
-    // Run the thread once. All 3 jobs should be emitted an sla exceeded event
+    // Run the thread once. All 3 jobs should be emitted an SLA exceeded event
     this._dagManagerThread.run();
 
     String slakilledMeterName1 = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorOne", ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER);
     String slakilledMeterName2 = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorTwo", ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER);
+
     Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName1).getCount(), 2);
     Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName2).getCount(), 1);
     // Cleanup
     this._dagManagerThread.run();
+    Assert.assertEquals(metricContext.getParent().get().getMeters().get(allSlaKilledMeterName).getCount(), previousSlaKilledCount + 3);
+
     Assert.assertEquals(this.dags.size(), 0);
     Assert.assertEquals(this.jobToDag.size(), 0);
     Assert.assertEquals(this.dagToJobs.size(), 0);
@@ -1053,6 +1064,157 @@ public class DagManagerTest {
     Assert.assertEquals(metricContext.getParent().get().getGauges().get(flowStateGaugeName1).getValue(), DagManager.FlowState.SUCCESSFUL.value);
   }
 
+  @Test (dependsOnMethods = "testEmitFlowMetricOnlyIfNotAdhoc")
+  public void testJobSlaKilledMetrics() throws URISyntaxException, IOException {
+    long flowExecutionId = System.currentTimeMillis() - Duration.ofMinutes(20).toMillis();
+    Config executorOneConfig = ConfigFactory.empty()
+        .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef("executorOne"))
+        .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(flowExecutionId))
+        .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10));
+    Config executorTwoConfig = ConfigFactory.empty()
+        .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef("executorTwo"))
+        .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10));
+    List<Dag<JobExecutionPlan>> dagList = buildDagList(2, "newUser", executorOneConfig);
+    dagList.add(buildDag("2", flowExecutionId, "FINISH_RUNNING", 1, "newUser", executorTwoConfig));
+
+    String allSlaKilledMeterName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,  ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER);
+    long previousSlaKilledCount = metricContext.getParent().get().getMeters().get(allSlaKilledMeterName) == null ? 0 :
+        metricContext.getParent().get().getMeters().get(allSlaKilledMeterName).getCount();
+
+    //Add a dag to the queue of dags
+    this.queue.offer(dagList.get(0));
+    this.queue.offer(dagList.get(1));
+    this.queue.offer(dagList.get(2));;
+    // Set orchestration time to be 20 minutes in the past, the job should be marked as SLA killed
+    Iterator<JobStatus> jobStatusIterator1 =
+        getMockJobStatus("flow0", "group0", flowExecutionId, "job0", "group0", String.valueOf(ExecutionStatus.RUNNING),
+            false, flowExecutionId);
+    Iterator<JobStatus> jobStatusIterator2 =
+        getMockJobStatus("flow1", "flow1", flowExecutionId, "job0", "group1", String.valueOf(ExecutionStatus.RUNNING),
+            false, flowExecutionId);
+    Iterator<JobStatus> jobStatusIterator3 =
+        getMockJobStatus("flow2", "flow2", flowExecutionId, "job0", "group2", String.valueOf(ExecutionStatus.RUNNING),
+            false, flowExecutionId);
+    Iterator<JobStatus> jobStatusIterator4 =
+        getMockJobStatus("flow0", "flow0", flowExecutionId, "job0", "group0", String.valueOf(ExecutionStatus.CANCELLED),
+            false, flowExecutionId);
+    Iterator<JobStatus> jobStatusIterator5 =
+        getMockJobStatus("flow1", "flow1", flowExecutionId, "job0", "group1", String.valueOf(ExecutionStatus.CANCELLED),
+            false, flowExecutionId);
+    Iterator<JobStatus> jobStatusIterator6 =
+        getMockJobStatus("flow2", "flow2", flowExecutionId, "job0", "group2", String.valueOf(ExecutionStatus.CANCELLED),
+            false, flowExecutionId);
+
+    Mockito.when(_jobStatusRetriever
+        .getJobStatusesForFlowExecution(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).
+        thenReturn(jobStatusIterator1).
+        thenReturn(jobStatusIterator2).
+        thenReturn(jobStatusIterator3).
+        thenReturn(jobStatusIterator4).
+        thenReturn(jobStatusIterator5).
+        thenReturn(jobStatusIterator6);
+
+    // Run the thread once. All 3 jobs should be emitted an SLA exceeded event
+    this._dagManagerThread.run();
+    this._dagManagerThread.run();
+
+    String slakilledMeterName1 = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorOne", ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER);
+    String slakilledMeterName2 = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorTwo", ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER);
+    String failedFlowGauge = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group1","flow1", ServiceMetricNames.RUNNING_STATUS);
+
+    String slakilledGroupName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group0", ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER);
+    Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName1).getCount(), 2);
+    Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName2).getCount(), 1);
+    Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledGroupName).getCount(), 1);
+    // Cleanup
+    this._dagManagerThread.run();
+    Assert.assertEquals(metricContext.getParent().get().getMeters().get(allSlaKilledMeterName).getCount(), previousSlaKilledCount + 3);
+    Assert.assertEquals(metricContext.getParent().get().getGauges().get(failedFlowGauge).getValue(), -1);
+
+    Assert.assertEquals(this.dags.size(), 0);
+    Assert.assertEquals(this.jobToDag.size(), 0);
+    Assert.assertEquals(this.dagToJobs.size(), 0);
+  }
+
+  @Test (dependsOnMethods = "testJobSlaKilledMetrics")
+  public void testPerExecutorMetricsSuccessFails() throws URISyntaxException, IOException {
+    long flowExecutionId = System.currentTimeMillis();
+    Config executorOneConfig = ConfigFactory.empty()
+        .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef("executorOne"))
+        .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(flowExecutionId));
+    Config executorTwoConfig = ConfigFactory.empty()
+        .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef("executorTwo"));
+    List<Dag<JobExecutionPlan>> dagList = buildDagList(2, "newUser", executorOneConfig);
+    dagList.add(buildDag("2", flowExecutionId, "FINISH_RUNNING", 1, "newUser", executorTwoConfig));
+    // Get global metric count before any changes are applied
+    String allSuccessfulFlowsMeterName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,  ServiceMetricNames.SUCCESSFUL_FLOW_METER);
+    long previousSuccessCount = metricContext.getParent().get().getMeters().get(allSuccessfulFlowsMeterName) == null ? 0 :
+        metricContext.getParent().get().getMeters().get(allSuccessfulFlowsMeterName).getCount();
+    String previousJobSentToExecutorMeterName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,  "executorOne", ServiceMetricNames.JOBS_SENT_TO_SPEC_EXECUTOR);
+    long previousJobSentToExecutorCount = metricContext.getParent().get().getMeters().get(previousJobSentToExecutorMeterName) == null ? 0 :
+        metricContext.getParent().get().getMeters().get(previousJobSentToExecutorMeterName).getCount();
+
+    //Add a dag to the queue of dags
+    this.queue.offer(dagList.get(0));
+    this.queue.offer(dagList.get(1));
+    this.queue.offer(dagList.get(2));;
+    // The start time should be 16 minutes ago, which is past the start SLA so the job should be cancelled
+    Iterator<JobStatus> jobStatusIterator1 =
+        getMockJobStatus( "flow0", "group0", flowExecutionId, "job0", "group0", String.valueOf(ExecutionStatus.ORCHESTRATED),
+            false, flowExecutionId);
+    Iterator<JobStatus> jobStatusIterator2 =
+        getMockJobStatus("flow1", "flow1", flowExecutionId+1, "job0", "group1", String.valueOf(ExecutionStatus.ORCHESTRATED),
+            false, flowExecutionId);
+    Iterator<JobStatus> jobStatusIterator3 =
+        getMockJobStatus("flow2", "flow2", flowExecutionId+1, "job0", "group2", String.valueOf(ExecutionStatus.ORCHESTRATED),
+            false, flowExecutionId);
+    Iterator<JobStatus> jobStatusIterator4 =
+        getMockJobStatus( "flow0", "flow0", flowExecutionId+1, "job0", "group0", String.valueOf(ExecutionStatus.COMPLETE),
+            false, flowExecutionId);
+    Iterator<JobStatus> jobStatusIterator5 =
+        getMockJobStatus("flow1", "flow1", flowExecutionId+1, "job0", "group1", String.valueOf(ExecutionStatus.FAILED),
+            false, flowExecutionId);
+    Iterator<JobStatus> jobStatusIterator6 =
+        getMockJobStatus("flow2", "flow2", flowExecutionId+1, "job0", "group2", String.valueOf(ExecutionStatus.COMPLETE),
+            false, flowExecutionId);
+
+    Mockito.when(_jobStatusRetriever
+        .getJobStatusesForFlowExecution(Mockito.eq("flow0"), Mockito.eq("group0"), Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).
+        thenReturn(jobStatusIterator1).
+        thenReturn(jobStatusIterator4);
+
+    Mockito.when(_jobStatusRetriever
+        .getJobStatusesForFlowExecution(Mockito.eq("flow1"), Mockito.eq("group1"), Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).
+        thenReturn(jobStatusIterator2).
+        thenReturn(jobStatusIterator5);
+
+    Mockito.when(_jobStatusRetriever
+        .getJobStatusesForFlowExecution(Mockito.eq("flow2"), Mockito.eq("group2"), Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).
+        thenReturn(jobStatusIterator3).
+        thenReturn(jobStatusIterator6);
+
+    this._dagManagerThread.run();
+
+    String slaSuccessfulFlowsExecutorOneMeterName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorOne", ServiceMetricNames.SUCCESSFUL_FLOW_METER);
+    String slaFailedFlowsExecutorOneMeterName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorOne", ServiceMetricNames.FAILED_FLOW_METER);
+    String failedFlowGauge = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group1", "flow1", ServiceMetricNames.RUNNING_STATUS);
+
+    this._dagManagerThread.run();
+
+    Assert.assertEquals(metricContext.getParent().get().getMeters().get(slaSuccessfulFlowsExecutorOneMeterName).getCount(), 1);
+    Assert.assertEquals(metricContext.getParent().get().getMeters().get(slaFailedFlowsExecutorOneMeterName).getCount(), 1);
+    Assert.assertEquals(metricContext.getParent().get().getMeters().get(allSuccessfulFlowsMeterName).getCount(), previousSuccessCount + 2);
+    Assert.assertEquals(metricContext.getParent().get().getMeters().get(previousJobSentToExecutorMeterName).getCount(), previousJobSentToExecutorCount + 2);
+    Assert.assertEquals(metricContext.getParent().get().getGauges().get(failedFlowGauge).getValue(), -1);
+    // Cleanup
+    this._dagManagerThread.run();
+
+    Assert.assertEquals(this.dags.size(), 0);
+    Assert.assertEquals(this.jobToDag.size(), 0);
+    Assert.assertEquals(this.dagToJobs.size(), 0);
+  }
+
+
 
   @AfterClass
   public void cleanUp() throws Exception {
index 42113829b9fa0d50bdc8586c830233fb81ec80ab..67188e1e810c2016f397215d68f9e087d3bfc0d8 100644 (file)
@@ -29,7 +29,7 @@ public class TestServiceMetrics {
   @Test
   public void matchesTest() {
 
-    MetricNameRegexFilter metricNameRegexForDagManager = DagManager.getMetricsFilterForDagManager();
+    MetricNameRegexFilter metricNameRegexForDagManager = DagManagerMetrics.getMetricsFilterForDagManager();
     Assert.assertTrue(metricNameRegexForDagManager.matches("GobblinService.testGroup.testFlow.RunningStatus", mock(Metric.class)));
     Assert.assertTrue(metricNameRegexForDagManager.matches("GobblinService.test..RunningStatus", mock(Metric.class)));
     Assert.assertFalse(metricNameRegexForDagManager.matches("test3.RunningStatus", mock(Metric.class)));