* Refactor metrics from DagManager into its own class, add metrics per executor for SUCCESS, FAILED, SLA_EXCEEDED, START_SLA_EXCEEDED
* Address review, fix flow gauge for failed flows
public static final String RUN_IMMEDIATELY_FLOW_METER = "RunImmediatelyFlow";
public static final String SUCCESSFUL_FLOW_METER = "SuccessfulFlows";
public static final String START_SLA_EXCEEDED_FLOWS_METER = "StartSLAExceededFlows";
+ public static final String SLA_EXCEEDED_FLOWS_METER = "SlaExceededFlows";
public static final String FAILED_FLOW_METER = "FailedFlows";
public static final String SCHEDULED_FLOW_METER = GOBBLIN_SERVICE_PREFIX + ".ScheduledFlows";
public static final String NON_SCHEDULED_FLOW_METER = GOBBLIN_SERVICE_PREFIX + ".NonScheduledFlows";
public static final String SERVICE_USERS = "ServiceUsers";
public static final String COMPILED = "Compiled";
public static final String RUNNING_STATUS = "RunningStatus";
+ public static final String JOBS_SENT_TO_SPEC_EXECUTOR = "JobsSentToSpecExecutor";
public static final String HELIX_LEADER_STATE = "HelixLeaderState";
}
package org.apache.gobblin.service.modules.orchestration;
import com.codahale.metrics.Meter;
-import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.lang3.StringUtils;
-
-import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.ContextAwareGauge;
-import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.RootMetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.metrics.metric.filter.MetricNameRegexFilter;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.FlowId;
-import org.apache.gobblin.service.RequesterService;
-import org.apache.gobblin.service.ServiceRequester;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
private final Config config;
private final Optional<EventSubmitter> eventSubmitter;
private final long failedDagRetentionTime;
+ private final DagManagerMetrics dagManagerMetrics;
private volatile boolean isActive = false;
this.pollingInterval = ConfigUtils.getInt(config, JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
this.retentionPollingInterval = ConfigUtils.getInt(config, FAILED_DAG_POLLING_INTERVAL, DEFAULT_FAILED_DAG_POLLING_INTERVAL);
this.instrumentationEnabled = instrumentationEnabled;
+ MetricContext metricContext = null;
if (instrumentationEnabled) {
- MetricContext metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
+ metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
this.eventSubmitter = Optional.of(new EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
} else {
this.eventSubmitter = Optional.absent();
}
+ this.dagManagerMetrics = new DagManagerMetrics(metricContext);
TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, JOB_START_SLA_UNITS, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
this.defaultJobStartSlaTimeMillis = jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
this.jobStatusRetriever = jobStatusRetriever;
topologySpecMap);
Set<String> failedDagIds = Collections.synchronizedSet(failedDagStateStore.getDagIds());
- ContextAwareMeter allSuccessfulMeter = null;
- ContextAwareMeter allFailedMeter = null;
- // TODO: create a map of RatioGauges for success/failed per executor, will require preprocessing of available executors
- Map<String, ContextAwareMeter> startSlaExceededMeters = Maps.newConcurrentMap();
-
- if (instrumentationEnabled) {
- MetricContext metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
- allSuccessfulMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
- ServiceMetricNames.SUCCESSFUL_FLOW_METER));
- allFailedMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
- ServiceMetricNames.FAILED_FLOW_METER));
- }
+ this.dagManagerMetrics.activate();
UserQuotaManager quotaManager = new UserQuotaManager(config);
// Before initializing the DagManagerThreads check which dags are currently running before shutdown
this.dagManagerThreads = new DagManagerThread[numThreads];
for (int i = 0; i < numThreads; i++) {
DagManagerThread dagManagerThread = new DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
- runQueue[i], cancelQueue[i], resumeQueue[i], instrumentationEnabled, failedDagIds, allSuccessfulMeter,
- allFailedMeter, startSlaExceededMeters, this.defaultJobStartSlaTimeMillis, quotaManager, i);
+ runQueue[i], cancelQueue[i], resumeQueue[i], instrumentationEnabled, failedDagIds, this.dagManagerMetrics,
+ this.defaultJobStartSlaTimeMillis, quotaManager, i);
this.dagManagerThreads[i] = dagManagerThread;
this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0, this.pollingInterval, TimeUnit.SECONDS);
}
} else { //Mark the DagManager inactive.
log.info("Inactivating the DagManager. Shutting down all DagManager threads");
this.scheduledExecutorPool.shutdown();
- // The DMThread's metrics mappings follow the lifecycle of the DMThread itself and so are lost by DM deactivation-reactivation but the RootMetricContext is a (persistent) singleton.
- // To avoid IllegalArgumentException by the RMC preventing (re-)add of a metric already known, remove all metrics that a new DMThread thread would attempt to add (in DagManagerThread::initialize) whenever running post-re-enablement
- RootMetricContext.get().removeMatching(getMetricsFilterForDagManager());
+ this.dagManagerMetrics.cleanup();
try {
this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
}
- @VisibleForTesting
- protected static MetricNameRegexFilter getMetricsFilterForDagManager() {
- return new MetricNameRegexFilter(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "\\..*\\." + ServiceMetricNames.RUNNING_STATUS);
- }
-
/**
* Each {@link DagManagerThread} performs 2 actions when scheduled:
* <ol>
private final Optional<EventSubmitter> eventSubmitter;
private final Optional<Timer> jobStatusPolledTimer;
private final AtomicLong orchestrationDelay = new AtomicLong(0);
- private static final Map<String, FlowState> flowGauges = Maps.newConcurrentMap();
- private final ContextAwareMeter allSuccessfulMeter;
- private final ContextAwareMeter allFailedMeter;
- private static final Map<String, ContextAwareMeter> groupSuccessfulMeters = Maps.newConcurrentMap();
- private static final Map<String, ContextAwareMeter> groupFailureMeters = Maps.newConcurrentMap();
- private final Map<String, ContextAwareMeter> startSlaExceededMeters;
+ private final DagManagerMetrics dagManagerMetrics;
private final UserQuotaManager quotaManager;
private final JobStatusRetriever jobStatusRetriever;
private final DagStateStore dagStateStore;
*/
DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore dagStateStore, DagStateStore failedDagStateStore,
BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String> cancelQueue, BlockingQueue<String> resumeQueue,
- boolean instrumentationEnabled, Set<String> failedDagIds, ContextAwareMeter allSuccessfulMeter,
- ContextAwareMeter allFailedMeter, Map<String, ContextAwareMeter> startSlaExceededMeters,
+ boolean instrumentationEnabled, Set<String> failedDagIds, DagManagerMetrics dagManagerMetrics,
Long defaultJobStartSla, UserQuotaManager quotaManager, int dagMangerThreadId) {
this.jobStatusRetriever = jobStatusRetriever;
this.dagStateStore = dagStateStore;
this.queue = queue;
this.cancelQueue = cancelQueue;
this.resumeQueue = resumeQueue;
- this.allSuccessfulMeter = allSuccessfulMeter;
- this.allFailedMeter = allFailedMeter;
+ this.dagManagerMetrics = dagManagerMetrics;
this.defaultJobStartSlaTimeMillis = defaultJobStartSla;
this.quotaManager = quotaManager;
- this.startSlaExceededMeters = startSlaExceededMeters;
if (instrumentationEnabled) {
this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
if (DagManagerUtils.getExecutionStatus(dagNode) == RUNNING) {
addJobState(dagId, dagNode);
//Update the running jobs counter.
- getRunningJobsCounter(dagNode).inc();
- getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::inc);
+ dagManagerMetrics.incrementRunningJobMetrics(dagNode);
isDagRunning = true;
}
}
FlowId flowId = DagManagerUtils.getFlowId(dag);
- // Do not register flow-specific metrics for a flow
- if (!flowGauges.containsKey(flowId.toString()) && DagManagerUtils.shouldFlowOutputMetrics(dag)) {
- String flowStateGaugeName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, flowId.getFlowGroup(),
- flowId.getFlowName(), ServiceMetricNames.RUNNING_STATUS);
- flowGauges.put(flowId.toString(), FlowState.RUNNING);
- ContextAwareGauge<Integer> gauge = RootMetricContext
- .get().newContextAwareGauge(flowStateGaugeName, () -> flowGauges.get(flowId.toString()).value);
- RootMetricContext.get().register(flowStateGaugeName, gauge);
- }
+ this.dagManagerMetrics.registerFlowMetric(flowId, dag);
log.debug("Dag {} submitting jobs ready for execution.", DagManagerUtils.getFullyQualifiedDagName(dag));
//Determine the next set of jobs to run and submit them for execution
// Set flow status to running
DagManagerUtils.emitFlowEvent(this.eventSubmitter, dag, TimingEvent.FlowTimings.FLOW_RUNNING);
- conditionallyUpdateFlowGaugeExecutionState(flowGauges, flowId, FlowState.RUNNING);
+ dagManagerMetrics.conditionallyMarkFlowAsState(flowId, FlowState.RUNNING);
// Report the orchestration delay the first time the Dag is initialized. Orchestration delay is defined as
// the time difference between the instant when a flow first transitions to the running state and the instant
DagManagerUtils.getJobName(node),
DagManagerUtils.getFullyQualifiedDagName(node),
timeOutForJobStart);
+ dagManagerMetrics.incrementCountsStartSlaExceeded(node);
cancelDagNode(node);
String dagId = DagManagerUtils.generateDagId(node);
this.dags.get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED);
this.dags.get(dagId).setMessage("Flow killed because no update received for " + timeOutForJobStart + " ms after orchestration");
- if (this.metricContext != null) {
- this.getExecutorMeterForDag(node, ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER, startSlaExceededMeters).mark();
- }
return true;
} else {
return false;
log.info("Flow {} exceeded the SLA of {} ms. Killing the job {} now...",
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), flowSla,
node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
+ dagManagerMetrics.incrementExecutorSlaExceeded(node);
cancelDagNode(node);
this.dags.get(dagId).setFlowEvent(TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED);
// Quota release is guaranteed, despite failure, because exception handling within would mark the job FAILED.
// When the ensuing kafka message spurs DagManager processing, the quota is released and the counts decremented
// Ensure that we do not double increment for flows that are retried
- if (this.metricContext != null && dagNode.getValue().getCurrentAttempts() == 1) {
- getRunningJobsCounter(dagNode).inc();
- getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::inc);
+ if (dagNode.getValue().getCurrentAttempts() == 1) {
+ dagManagerMetrics.incrementRunningJobMetrics(dagNode);
}
// Submit the job to the SpecProducer, which in turn performs the actual job submission to the SpecExecutor instance.
// The SpecProducer implementations submit the job to the underlying executor and return when the submission is complete,
jobOrchestrationTimer.stop(jobMetadata);
}
log.info("Orchestrated job: {} on Executor: {}", DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
+ this.dagManagerMetrics.incrementJobsSentToExecutor(dagNode);
} catch (Exception e) {
TimingEvent jobFailedTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get().
getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED) : null;
ExecutionStatus jobStatus = DagManagerUtils.getExecutionStatus(dagNode);
log.info("Job {} of Dag {} has finished with status {}", jobName, dagId, jobStatus.name());
// Only decrement counters and quota for jobs that actually ran on the executor, not from a GaaS side failure/skip event
- if (quotaManager.releaseQuota(dagNode) && this.metricContext != null) {
- getRunningJobsCounter(dagNode).dec();
- getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::dec);
+ if (quotaManager.releaseQuota(dagNode)) {
+ dagManagerMetrics.decrementRunningJobMetrics(dagNode);
}
switch (jobStatus) {
} else {
this.failedDagIdsFinishAllPossible.add(dagId);
}
+ dagManagerMetrics.incrementExecutorFailed(dagNode);
return Maps.newHashMap();
case CANCELLED:
if (DagManagerUtils.getFailureOption(dag) == FailureOption.FINISH_RUNNING) {
}
return Maps.newHashMap();
case COMPLETE:
+ dagManagerMetrics.incrementExecutorSuccess(dagNode);
return submitNext(dagId);
default:
log.warn("It should not reach here. Job status is unexpected.");
return dagNodes != null && !dagNodes.isEmpty();
}
- private ContextAwareCounter getRunningJobsCounter(DagNode<JobExecutionPlan> dagNode) {
- return metricContext.contextAwareCounter(
- MetricRegistry.name(
- ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
- ServiceMetricNames.RUNNING_FLOWS_COUNTER,
- DagManagerUtils.getSpecExecutorName(dagNode)));
- }
-
- private List<ContextAwareCounter> getRunningJobsCounterForUser(DagNode<JobExecutionPlan> dagNode) {
- Config configs = dagNode.getValue().getJobSpec().getConfig();
- String proxy = ConfigUtils.getString(configs, AzkabanProjectConfig.USER_TO_PROXY, null);
- List<ContextAwareCounter> counters = new ArrayList<>();
-
- if (StringUtils.isNotEmpty(proxy)) {
- counters.add(metricContext.contextAwareCounter(
- MetricRegistry.name(
- ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
- ServiceMetricNames.SERVICE_USERS, proxy)));
- }
-
- try {
- String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
- if (StringUtils.isNotEmpty(serializedRequesters)) {
- List<ServiceRequester> requesters = RequesterService.deserialize(serializedRequesters);
- for (ServiceRequester requester : requesters) {
- counters.add(metricContext.contextAwareCounter(MetricRegistry
- .name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.SERVICE_USERS, requester.getName())));
- }
- }
- } catch (IOException e) {
- log.error("Error while fetching requester list.", e);
- }
-
- return counters;
- }
-
- private ContextAwareMeter getGroupMeterForDag(String dagId, String meterName, Map<String, ContextAwareMeter> meterMap) {
- String flowGroup = DagManagerUtils.getFlowId(this.dags.get(dagId)).getFlowGroup();
- return meterMap.computeIfAbsent(flowGroup,
- group -> metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, group, meterName)));
- }
-
- /**
- * Used to track metrics for different specExecutors to detect issues with the specExecutor itself
- * @param dagNode
- * @param meterName
- * @param meterMap
- * @return
- */
- private ContextAwareMeter getExecutorMeterForDag(DagNode<JobExecutionPlan> dagNode, String meterName, Map<String, ContextAwareMeter> meterMap) {
- String executorName = DagManagerUtils.getSpecExecutorName(dagNode);
- return meterMap.computeIfAbsent(executorName,
- executorUri -> metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, executorUri, meterName)));
- }
-
/**
* Perform clean up. Remove a dag from the dagstore if the dag is complete and update internal state.
*/
DagNode<JobExecutionPlan> dagNode = dagNodeList.poll();
deleteJobState(dagId, dagNode);
}
- log.info("Dag {} has finished with status FAILED; Cleaning up dag from the state store.", dagId);
- onFlowFailure(dagId);
+ Dag<JobExecutionPlan> dag = this.dags.get(dagId);
+ String status = TimingEvent.FlowTimings.FLOW_FAILED;
+ addFailedDag(dagId, dag);
+ log.info("Dag {} has finished with status {}; Cleaning up dag from the state store.", dagId, status);
// send an event before cleaning up dag
- DagManagerUtils.emitFlowEvent(this.eventSubmitter, this.dags.get(dagId), TimingEvent.FlowTimings.FLOW_FAILED);
+ DagManagerUtils.emitFlowEvent(this.eventSubmitter, this.dags.get(dagId), status);
dagIdstoClean.add(dagId);
}
- //Clean up completed dags
- for (String dagId : this.dags.keySet()) {
+ // Remove dags that are finished and emit their appropriate metrics
+ for (Map.Entry<String, Dag<JobExecutionPlan>> dagIdKeyPair : this.dags.entrySet()) {
+ String dagId = dagIdKeyPair.getKey();
+ Dag<JobExecutionPlan> dag = dagIdKeyPair.getValue();
if (!hasRunningJobs(dagId) && !this.failedDagIdsFinishRunning.contains(dagId)) {
String status = TimingEvent.FlowTimings.FLOW_SUCCEEDED;
if (this.failedDagIdsFinishAllPossible.contains(dagId)) {
- onFlowFailure(dagId);
status = TimingEvent.FlowTimings.FLOW_FAILED;
+ addFailedDag(dagId, dag);
this.failedDagIdsFinishAllPossible.remove(dagId);
- conditionallyUpdateFlowGaugeExecutionState(flowGauges, DagManagerUtils.getFlowId(this.dags.get(dagId)), FlowState.FAILED);
} else {
- onFlowSuccess(dagId);
+ dagManagerMetrics.emitFlowSuccessMetrics(DagManagerUtils.getFlowId(this.dags.get(dagId)));
}
log.info("Dag {} has finished with status {}; Cleaning up dag from the state store.", dagId, status);
// send an event before cleaning up dag
}
}
- private void onFlowSuccess(String dagId) {
- if (this.metricContext != null) {
- conditionallyUpdateFlowGaugeExecutionState(flowGauges, DagManagerUtils.getFlowId(this.dags.get(dagId)), FlowState.SUCCESSFUL);
- this.allSuccessfulMeter.mark();
- getGroupMeterForDag(dagId, ServiceMetricNames.SUCCESSFUL_FLOW_METER, groupSuccessfulMeters).mark();
- }
- }
-
- private void onFlowFailure(String dagId) {
- addFailedDag(dagId);
- if (this.metricContext != null) {
- conditionallyUpdateFlowGaugeExecutionState(flowGauges, DagManagerUtils.getFlowId(this.dags.get(dagId)), FlowState.FAILED);
- this.allFailedMeter.mark();
- getGroupMeterForDag(dagId, ServiceMetricNames.FAILED_FLOW_METER, groupFailureMeters).mark();
- }
- }
-
/**
* Add a dag to failed dag state store
*/
- private synchronized void addFailedDag(String dagId) {
+ private synchronized void addFailedDag(String dagId, Dag<JobExecutionPlan> dag) {
+ FlowId flowId = DagManagerUtils.getFlowId(dag);
try {
log.info("Adding dag " + dagId + " to failed dag state store");
this.failedDagStateStore.writeCheckpoint(this.dags.get(dagId));
log.error("Failed to add dag " + dagId + " to failed dag state store", e);
}
this.failedDagIds.add(dagId);
+ if (TimingEvent.FlowTimings.FLOW_RUN_DEADLINE_EXCEEDED.equals(dag.getFlowEvent())) {
+ this.dagManagerMetrics.emitFlowSlaExceededMetrics(flowId);
+ } else if (!TimingEvent.FlowTimings.FLOW_START_DEADLINE_EXCEEDED.equals(dag.getFlowEvent())) {
+ dagManagerMetrics.emitFlowFailedMetrics(flowId);
+ }
+ this.dagManagerMetrics.conditionallyMarkFlowAsState(flowId, DagManager.FlowState.FAILED);
}
/**
this.dags.remove(dagId);
this.dagToJobs.remove(dagId);
}
-
- /**
- * Updates flowGauges with the appropriate state if the gauge is being tracked for the flow
- * @param flowGauges
- * @param flowId
- * @param state
- */
- private void conditionallyUpdateFlowGaugeExecutionState(Map<String, FlowState> flowGauges, FlowId flowId, FlowState state) {
- if (flowGauges.containsKey(flowId.toString())) {
- flowGauges.put(flowId.toString(), state);
- }
- }
}
public enum FlowState {
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.ContextAwareCounter;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.metric.filter.MetricNameRegexFilter;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceRequester;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@Slf4j
+public class DagManagerMetrics {
+ private static final Map<String, DagManager.FlowState> flowGauges = Maps.newConcurrentMap();
+ // Meters representing the total number of flows in a given state
+ private ContextAwareMeter allSuccessfulMeter;
+ private ContextAwareMeter allFailedMeter;
+ private ContextAwareMeter allRunningMeter;
+ private ContextAwareMeter allSlaExceededMeter;
+ private ContextAwareMeter allStartSlaExceededMeter;
+ // Meters representing the flows in a given state per flowgroup
+ private final Map<String, ContextAwareMeter> groupSuccessfulMeters = Maps.newConcurrentMap();
+ private final Map<String, ContextAwareMeter> groupFailureMeters = Maps.newConcurrentMap();
+ private final Map<String, ContextAwareMeter> groupStartSlaExceededMeters = Maps.newConcurrentMap();
+ private final Map<String, ContextAwareMeter> groupSlaExceededMeters = Maps.newConcurrentMap();
+
+ // Meters representing the jobs in a given state per executor
+ // These metrics need to be invoked differently to account for automated retries and multihop scenarios.
+ private final Map<String, ContextAwareMeter> executorSuccessMeters = Maps.newConcurrentMap();
+ private final Map<String, ContextAwareMeter> executorFailureMeters = Maps.newConcurrentMap();
+ private final Map<String, ContextAwareMeter> executorStartSlaExceededMeters = Maps.newConcurrentMap();
+ private final Map<String, ContextAwareMeter> executorSlaExceededMeters = Maps.newConcurrentMap();
+ private final Map<String, ContextAwareMeter> executorJobSentMeters = Maps.newConcurrentMap();
+ MetricContext metricContext;
+
+ public DagManagerMetrics(MetricContext metricContext) {
+ this.metricContext = metricContext;
+ }
+
+ public void activate() {
+ if (this.metricContext != null) {
+ allSuccessfulMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ServiceMetricNames.SUCCESSFUL_FLOW_METER));
+ allFailedMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ServiceMetricNames.FAILED_FLOW_METER));
+ allStartSlaExceededMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER));
+ allSlaExceededMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER));
+ allRunningMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ServiceMetricNames.JOBS_SENT_TO_SPEC_EXECUTOR));
+ }
+ }
+
+ public void registerFlowMetric(FlowId flowId, Dag<JobExecutionPlan> dag) {
+ // Do not register flow-specific metrics for an adhoc flow
+ if (!flowGauges.containsKey(flowId.toString()) && DagManagerUtils.shouldFlowOutputMetrics(dag)) {
+ String flowStateGaugeName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, flowId.getFlowGroup(),
+ flowId.getFlowName(), ServiceMetricNames.RUNNING_STATUS);
+ flowGauges.put(flowId.toString(), DagManager.FlowState.RUNNING);
+ ContextAwareGauge<Integer> gauge = RootMetricContext
+ .get().newContextAwareGauge(flowStateGaugeName, () -> flowGauges.get(flowId.toString()).value);
+ RootMetricContext.get().register(flowStateGaugeName, gauge);
+ }
+ }
+
+ public void incrementRunningJobMetrics(Dag.DagNode<JobExecutionPlan> dagNode) {
+ if (this.metricContext != null) {
+ this.getRunningJobsCounterForExecutor(dagNode).inc();
+ this.getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::inc);
+ }
+ }
+
+ public void decrementRunningJobMetrics(Dag.DagNode<JobExecutionPlan> dagNode) {
+ if (this.metricContext != null) {
+ this.getRunningJobsCounterForExecutor(dagNode).dec();
+ this.getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::dec);
+ }
+ }
+
+ /**
+ * Updates flowGauges with the appropriate state if the gauge is being tracked for the flow
+ * @param flowId
+ * @param state
+ */
+ public void conditionallyMarkFlowAsState(FlowId flowId, DagManager.FlowState state) {
+ if (flowGauges.containsKey(flowId.toString())) {
+ flowGauges.put(flowId.toString(), state);
+ }
+ }
+
+ public void emitFlowSuccessMetrics(FlowId flowId) {
+ if (this.metricContext != null) {
+ this.conditionallyMarkFlowAsState(flowId, DagManager.FlowState.SUCCESSFUL);
+ this.allSuccessfulMeter.mark();
+ this.getGroupMeterForDag(flowId.getFlowGroup(), ServiceMetricNames.SUCCESSFUL_FLOW_METER, groupSuccessfulMeters).mark();
+ }
+ }
+
+ public void emitFlowFailedMetrics(FlowId flowId) {
+ if (this.metricContext != null) {
+ this.conditionallyMarkFlowAsState(flowId, DagManager.FlowState.FAILED);
+ this.allFailedMeter.mark();
+ this.getGroupMeterForDag(flowId.getFlowGroup(), ServiceMetricNames.FAILED_FLOW_METER, groupFailureMeters).mark();
+ }
+ }
+
+ public void emitFlowSlaExceededMetrics(FlowId flowId) {
+ if (this.metricContext != null) {
+ this.conditionallyMarkFlowAsState(flowId, DagManager.FlowState.FAILED);
+ this.allSlaExceededMeter.mark();
+ this.getGroupMeterForDag(flowId.getFlowGroup(), ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER, groupSlaExceededMeters).mark();
+ }
+ }
+
+ public void incrementExecutorSuccess(Dag.DagNode<JobExecutionPlan> node) {
+ if (this.metricContext != null) {
+ this.getExecutorMeterForDag(node, ServiceMetricNames.SUCCESSFUL_FLOW_METER, executorSuccessMeters).mark();
+ }
+ }
+
+ public void incrementExecutorFailed(Dag.DagNode<JobExecutionPlan> node) {
+ if (this.metricContext != null) {
+ this.getExecutorMeterForDag(node, ServiceMetricNames.FAILED_FLOW_METER, executorFailureMeters).mark();
+ }
+ }
+
+ public void incrementExecutorSlaExceeded(Dag.DagNode<JobExecutionPlan> node) {
+ if (this.metricContext != null) {
+ this.getExecutorMeterForDag(node, ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER, executorSlaExceededMeters).mark();
+ }
+ }
+
+ public void incrementJobsSentToExecutor(Dag.DagNode<JobExecutionPlan> node) {
+ if (this.metricContext != null) {
+ this.getExecutorMeterForDag(node, ServiceMetricNames.JOBS_SENT_TO_SPEC_EXECUTOR, executorJobSentMeters).mark();
+ this.allRunningMeter.mark();
+ }
+ }
+
+ // Increment the counts for start sla during the flow submission rather than cleanup to account for retries obfuscating this metric
+ public void incrementCountsStartSlaExceeded(Dag.DagNode<JobExecutionPlan> node) {
+ String flowGroup = node.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ if (this.metricContext != null) {
+ this.getGroupMeterForDag(flowGroup, ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER, groupStartSlaExceededMeters);
+ this.allStartSlaExceededMeter.mark();
+ this.getExecutorMeterForDag(node, ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER, executorStartSlaExceededMeters).mark();
+ }
+ }
+
+ private List<ContextAwareCounter> getRunningJobsCounterForUser(Dag.DagNode<JobExecutionPlan> dagNode) {
+ Config configs = dagNode.getValue().getJobSpec().getConfig();
+ String proxy = ConfigUtils.getString(configs, AzkabanProjectConfig.USER_TO_PROXY, null);
+ List<ContextAwareCounter> counters = new ArrayList<>();
+
+ if (StringUtils.isNotEmpty(proxy)) {
+ counters.add(this.metricContext.contextAwareCounter(
+ MetricRegistry.name(
+ ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ServiceMetricNames.SERVICE_USERS, proxy)));
+ }
+
+ try {
+ String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
+ if (StringUtils.isNotEmpty(serializedRequesters)) {
+ List<ServiceRequester> requesters = RequesterService.deserialize(serializedRequesters);
+ for (ServiceRequester requester : requesters) {
+ counters.add(this.metricContext.contextAwareCounter(MetricRegistry
+ .name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.SERVICE_USERS, requester.getName())));
+ }
+ }
+ } catch (IOException e) {
+ log.error("Error while fetching requester list.", e);
+ }
+
+ return counters;
+ }
+
+ private ContextAwareCounter getRunningJobsCounterForExecutor(Dag.DagNode<JobExecutionPlan> dagNode) {
+ return this.metricContext.contextAwareCounter(
+ MetricRegistry.name(
+ ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ DagManagerUtils.getSpecExecutorName(dagNode),
+ ServiceMetricNames.RUNNING_FLOWS_COUNTER));
+ }
+
+
+ private ContextAwareMeter getGroupMeterForDag(String flowGroup, String meterName, Map<String, ContextAwareMeter> meterMap) {
+ return meterMap.computeIfAbsent(flowGroup,
+ group -> metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, group, meterName)));
+ }
+
+ /**
+ * Used to track metrics for different specExecutors to detect issues with the specExecutor itself
+ * @param dagNode
+ * @param meterName
+ * @param meterMap
+ * @return
+ */
+ private ContextAwareMeter getExecutorMeterForDag(Dag.DagNode<JobExecutionPlan> dagNode, String meterName, Map<String, ContextAwareMeter> meterMap) {
+ String executorName = DagManagerUtils.getSpecExecutorName(dagNode);
+ return meterMap.computeIfAbsent(executorName,
+ executorUri -> metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, executorUri, meterName)));
+ }
+
+
+ @VisibleForTesting
+ protected static MetricNameRegexFilter getMetricsFilterForDagManager() {
+ return new MetricNameRegexFilter(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "\\..*\\." + ServiceMetricNames.RUNNING_STATUS);
+ }
+
+ public void cleanup() {
+ // The DMThread's metrics mappings follow the lifecycle of the DMThread itself and so are lost by DM deactivation-reactivation but the RootMetricContext is a (persistent) singleton.
+ // To avoid IllegalArgumentException by the RMC preventing (re-)add of a metric already known, remove all metrics that a new DMThread thread would attempt to add (in DagManagerThread::initialize) whenever running post-re-enablement
+ RootMetricContext.get().removeMatching(getMetricsFilterForDagManager());
+ }
+}
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URISyntaxException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
private DagStateStore _dagStateStore;
private DagStateStore _failedDagStateStore;
private JobStatusRetriever _jobStatusRetriever;
+ private DagManagerMetrics _dagManagerMetrics;
private DagManager.DagManagerThread _dagManagerThread;
private LinkedBlockingQueue<Dag<JobExecutionPlan>> queue;
private LinkedBlockingQueue<String> cancelQueue;
Config quotaConfig = ConfigFactory.empty()
.withValue(UserQuotaManager.PER_USER_QUOTA, ConfigValueFactory.fromAnyRef("user:1"));
this._gobblinServiceQuotaManager = new UserQuotaManager(quotaConfig);
+ this._dagManagerMetrics = new DagManagerMetrics(metricContext);
+ this._dagManagerMetrics.activate();
this._dagManagerThread = new DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, _failedDagStateStore, queue, cancelQueue,
- resumeQueue, true, new HashSet<>(), metricContext.contextAwareMeter("successMeter"),
- metricContext.contextAwareMeter("failedMeter"), new HashMap<>(), START_SLA_DEFAULT, _gobblinServiceQuotaManager, 0);
+ resumeQueue, true, new HashSet<>(), this._dagManagerMetrics, START_SLA_DEFAULT, _gobblinServiceQuotaManager, 0);
Field jobToDagField = DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
jobToDagField.setAccessible(true);
// The start time should be 16 minutes ago, which is past the start SLA so the job should be cancelled
Iterator<JobStatus> jobStatusIterator1 =
getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0, flowGroup, String.valueOf(ExecutionStatus.ORCHESTRATED),
- false, flowExecutionId - 16 * 60 * 1000);
+ false, flowExecutionId - Duration.ofMinutes(16).toMillis());
// This is for the second Dag that does not match the SLA so should schedule normally
Iterator<JobStatus> jobStatusIterator2 =
getMockJobStatus(flowName1, flowGroup1, flowExecutionId+1, jobName0, flowGroup1, String.valueOf(ExecutionStatus.ORCHESTRATED),
- false, flowExecutionId - 10 * 60 * 1000);
+ false, flowExecutionId - Duration.ofMinutes(10).toMillis());
// Let the first job get reported as cancel due to SLA kill on start and clean up
Iterator<JobStatus> jobStatusIterator3 =
getMockJobStatus(flowName, flowGroup, flowExecutionId, jobName0, flowGroup, String.valueOf(ExecutionStatus.CANCELLED),
- false, flowExecutionId - 16 * 60 * 1000);
+ false, flowExecutionId - Duration.ofMinutes(16).toMillis());
// Cleanup the running job that is scheduled normally
Iterator<JobStatus> jobStatusIterator4 =
getMockJobStatus(flowName1, flowGroup1, flowExecutionId+1, jobName0, flowGroup1, String.valueOf(ExecutionStatus.COMPLETE));
@Test (dependsOnMethods = "testJobStartSLAKilledDag")
public void testJobKilledSLAMetricsArePerExecutor() throws URISyntaxException, IOException {
long flowExecutionId = System.currentTimeMillis();
+ // The start time should be 16 minutes ago, which is past the start SLA so the job should be cancelled
+ long startOrchestrationTime = flowExecutionId - Duration.ofMinutes(16).toMillis();
Config executorOneConfig = ConfigFactory.empty()
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef("executorOne"))
.withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(flowExecutionId));
List<Dag<JobExecutionPlan>> dagList = buildDagList(2, "user", executorOneConfig);
dagList.add(buildDag("2", flowExecutionId, "FINISH_RUNNING", 1, "user", executorTwoConfig));
+ String allSlaKilledMeterName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER);
+ long previousSlaKilledCount = metricContext.getParent().get().getMeters().get(allSlaKilledMeterName) == null ? 0 :
+ metricContext.getParent().get().getMeters().get(allSlaKilledMeterName).getCount();
+
//Add a dag to the queue of dags
this.queue.offer(dagList.get(0));
this.queue.offer(dagList.get(1));
this.queue.offer(dagList.get(2));;
- // The start time should be 16 minutes ago, which is past the start SLA so the job should be cancelled
Iterator<JobStatus> jobStatusIterator1 =
getMockJobStatus("flow0", "group0", flowExecutionId, "job0", "group0", String.valueOf(ExecutionStatus.ORCHESTRATED),
- false, flowExecutionId - 16 * 60 * 1000);
+ false, startOrchestrationTime);
Iterator<JobStatus> jobStatusIterator2 =
getMockJobStatus("flow1", "flow1", flowExecutionId+1, "job0", "group1", String.valueOf(ExecutionStatus.ORCHESTRATED),
- false, flowExecutionId - 16 * 60 * 1000);
+ false, startOrchestrationTime);
Iterator<JobStatus> jobStatusIterator3 =
getMockJobStatus("flow2", "flow2", flowExecutionId+1, "job0", "group2", String.valueOf(ExecutionStatus.ORCHESTRATED),
- false, flowExecutionId - 16 * 60 * 1000);
+ false, startOrchestrationTime);
Mockito.when(_jobStatusRetriever
.getJobStatusesForFlowExecution(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).
thenReturn(jobStatusIterator2).
thenReturn(jobStatusIterator3);
- // Run the thread once. All 3 jobs should be emitted an sla exceeded event
+ // Run the thread once. All 3 jobs should be emitted an SLA exceeded event
this._dagManagerThread.run();
String slakilledMeterName1 = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorOne", ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER);
String slakilledMeterName2 = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorTwo", ServiceMetricNames.START_SLA_EXCEEDED_FLOWS_METER);
+
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName1).getCount(), 2);
Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName2).getCount(), 1);
// Cleanup
this._dagManagerThread.run();
+ Assert.assertEquals(metricContext.getParent().get().getMeters().get(allSlaKilledMeterName).getCount(), previousSlaKilledCount + 3);
+
Assert.assertEquals(this.dags.size(), 0);
Assert.assertEquals(this.jobToDag.size(), 0);
Assert.assertEquals(this.dagToJobs.size(), 0);
Assert.assertEquals(metricContext.getParent().get().getGauges().get(flowStateGaugeName1).getValue(), DagManager.FlowState.SUCCESSFUL.value);
}
+ @Test (dependsOnMethods = "testEmitFlowMetricOnlyIfNotAdhoc")
+ public void testJobSlaKilledMetrics() throws URISyntaxException, IOException {
+ long flowExecutionId = System.currentTimeMillis() - Duration.ofMinutes(20).toMillis();
+ Config executorOneConfig = ConfigFactory.empty()
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef("executorOne"))
+ .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(flowExecutionId))
+ .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10));
+ Config executorTwoConfig = ConfigFactory.empty()
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef("executorTwo"))
+ .withValue(ConfigurationKeys.GOBBLIN_FLOW_SLA_TIME, ConfigValueFactory.fromAnyRef(10));
+ List<Dag<JobExecutionPlan>> dagList = buildDagList(2, "newUser", executorOneConfig);
+ dagList.add(buildDag("2", flowExecutionId, "FINISH_RUNNING", 1, "newUser", executorTwoConfig));
+
+ String allSlaKilledMeterName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER);
+ long previousSlaKilledCount = metricContext.getParent().get().getMeters().get(allSlaKilledMeterName) == null ? 0 :
+ metricContext.getParent().get().getMeters().get(allSlaKilledMeterName).getCount();
+
+ //Add a dag to the queue of dags
+ this.queue.offer(dagList.get(0));
+ this.queue.offer(dagList.get(1));
+ this.queue.offer(dagList.get(2));;
+ // Set orchestration time to be 20 minutes in the past, the job should be marked as SLA killed
+ Iterator<JobStatus> jobStatusIterator1 =
+ getMockJobStatus("flow0", "group0", flowExecutionId, "job0", "group0", String.valueOf(ExecutionStatus.RUNNING),
+ false, flowExecutionId);
+ Iterator<JobStatus> jobStatusIterator2 =
+ getMockJobStatus("flow1", "flow1", flowExecutionId, "job0", "group1", String.valueOf(ExecutionStatus.RUNNING),
+ false, flowExecutionId);
+ Iterator<JobStatus> jobStatusIterator3 =
+ getMockJobStatus("flow2", "flow2", flowExecutionId, "job0", "group2", String.valueOf(ExecutionStatus.RUNNING),
+ false, flowExecutionId);
+ Iterator<JobStatus> jobStatusIterator4 =
+ getMockJobStatus("flow0", "flow0", flowExecutionId, "job0", "group0", String.valueOf(ExecutionStatus.CANCELLED),
+ false, flowExecutionId);
+ Iterator<JobStatus> jobStatusIterator5 =
+ getMockJobStatus("flow1", "flow1", flowExecutionId, "job0", "group1", String.valueOf(ExecutionStatus.CANCELLED),
+ false, flowExecutionId);
+ Iterator<JobStatus> jobStatusIterator6 =
+ getMockJobStatus("flow2", "flow2", flowExecutionId, "job0", "group2", String.valueOf(ExecutionStatus.CANCELLED),
+ false, flowExecutionId);
+
+ Mockito.when(_jobStatusRetriever
+ .getJobStatusesForFlowExecution(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).
+ thenReturn(jobStatusIterator1).
+ thenReturn(jobStatusIterator2).
+ thenReturn(jobStatusIterator3).
+ thenReturn(jobStatusIterator4).
+ thenReturn(jobStatusIterator5).
+ thenReturn(jobStatusIterator6);
+
+ // Run the thread once. All 3 jobs should be emitted an SLA exceeded event
+ this._dagManagerThread.run();
+ this._dagManagerThread.run();
+
+ String slakilledMeterName1 = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorOne", ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER);
+ String slakilledMeterName2 = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorTwo", ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER);
+ String failedFlowGauge = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group1","flow1", ServiceMetricNames.RUNNING_STATUS);
+
+ String slakilledGroupName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group0", ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER);
+ Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName1).getCount(), 2);
+ Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledMeterName2).getCount(), 1);
+ Assert.assertEquals(metricContext.getParent().get().getMeters().get(slakilledGroupName).getCount(), 1);
+ // Cleanup
+ this._dagManagerThread.run();
+ Assert.assertEquals(metricContext.getParent().get().getMeters().get(allSlaKilledMeterName).getCount(), previousSlaKilledCount + 3);
+ Assert.assertEquals(metricContext.getParent().get().getGauges().get(failedFlowGauge).getValue(), -1);
+
+ Assert.assertEquals(this.dags.size(), 0);
+ Assert.assertEquals(this.jobToDag.size(), 0);
+ Assert.assertEquals(this.dagToJobs.size(), 0);
+ }
+
+ @Test (dependsOnMethods = "testJobSlaKilledMetrics")
+ public void testPerExecutorMetricsSuccessFails() throws URISyntaxException, IOException {
+ long flowExecutionId = System.currentTimeMillis();
+ Config executorOneConfig = ConfigFactory.empty()
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef("executorOne"))
+ .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(flowExecutionId));
+ Config executorTwoConfig = ConfigFactory.empty()
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, ConfigValueFactory.fromAnyRef("executorTwo"));
+ List<Dag<JobExecutionPlan>> dagList = buildDagList(2, "newUser", executorOneConfig);
+ dagList.add(buildDag("2", flowExecutionId, "FINISH_RUNNING", 1, "newUser", executorTwoConfig));
+ // Get global metric count before any changes are applied
+ String allSuccessfulFlowsMeterName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.SUCCESSFUL_FLOW_METER);
+ long previousSuccessCount = metricContext.getParent().get().getMeters().get(allSuccessfulFlowsMeterName) == null ? 0 :
+ metricContext.getParent().get().getMeters().get(allSuccessfulFlowsMeterName).getCount();
+ String previousJobSentToExecutorMeterName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorOne", ServiceMetricNames.JOBS_SENT_TO_SPEC_EXECUTOR);
+ long previousJobSentToExecutorCount = metricContext.getParent().get().getMeters().get(previousJobSentToExecutorMeterName) == null ? 0 :
+ metricContext.getParent().get().getMeters().get(previousJobSentToExecutorMeterName).getCount();
+
+ //Add a dag to the queue of dags
+ this.queue.offer(dagList.get(0));
+ this.queue.offer(dagList.get(1));
+ this.queue.offer(dagList.get(2));;
+ // The start time should be 16 minutes ago, which is past the start SLA so the job should be cancelled
+ Iterator<JobStatus> jobStatusIterator1 =
+ getMockJobStatus( "flow0", "group0", flowExecutionId, "job0", "group0", String.valueOf(ExecutionStatus.ORCHESTRATED),
+ false, flowExecutionId);
+ Iterator<JobStatus> jobStatusIterator2 =
+ getMockJobStatus("flow1", "flow1", flowExecutionId+1, "job0", "group1", String.valueOf(ExecutionStatus.ORCHESTRATED),
+ false, flowExecutionId);
+ Iterator<JobStatus> jobStatusIterator3 =
+ getMockJobStatus("flow2", "flow2", flowExecutionId+1, "job0", "group2", String.valueOf(ExecutionStatus.ORCHESTRATED),
+ false, flowExecutionId);
+ Iterator<JobStatus> jobStatusIterator4 =
+ getMockJobStatus( "flow0", "flow0", flowExecutionId+1, "job0", "group0", String.valueOf(ExecutionStatus.COMPLETE),
+ false, flowExecutionId);
+ Iterator<JobStatus> jobStatusIterator5 =
+ getMockJobStatus("flow1", "flow1", flowExecutionId+1, "job0", "group1", String.valueOf(ExecutionStatus.FAILED),
+ false, flowExecutionId);
+ Iterator<JobStatus> jobStatusIterator6 =
+ getMockJobStatus("flow2", "flow2", flowExecutionId+1, "job0", "group2", String.valueOf(ExecutionStatus.COMPLETE),
+ false, flowExecutionId);
+
+ Mockito.when(_jobStatusRetriever
+ .getJobStatusesForFlowExecution(Mockito.eq("flow0"), Mockito.eq("group0"), Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).
+ thenReturn(jobStatusIterator1).
+ thenReturn(jobStatusIterator4);
+
+ Mockito.when(_jobStatusRetriever
+ .getJobStatusesForFlowExecution(Mockito.eq("flow1"), Mockito.eq("group1"), Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).
+ thenReturn(jobStatusIterator2).
+ thenReturn(jobStatusIterator5);
+
+ Mockito.when(_jobStatusRetriever
+ .getJobStatusesForFlowExecution(Mockito.eq("flow2"), Mockito.eq("group2"), Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).
+ thenReturn(jobStatusIterator3).
+ thenReturn(jobStatusIterator6);
+
+ this._dagManagerThread.run();
+
+ String slaSuccessfulFlowsExecutorOneMeterName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorOne", ServiceMetricNames.SUCCESSFUL_FLOW_METER);
+ String slaFailedFlowsExecutorOneMeterName = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "executorOne", ServiceMetricNames.FAILED_FLOW_METER);
+ String failedFlowGauge = MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group1", "flow1", ServiceMetricNames.RUNNING_STATUS);
+
+ this._dagManagerThread.run();
+
+ Assert.assertEquals(metricContext.getParent().get().getMeters().get(slaSuccessfulFlowsExecutorOneMeterName).getCount(), 1);
+ Assert.assertEquals(metricContext.getParent().get().getMeters().get(slaFailedFlowsExecutorOneMeterName).getCount(), 1);
+ Assert.assertEquals(metricContext.getParent().get().getMeters().get(allSuccessfulFlowsMeterName).getCount(), previousSuccessCount + 2);
+ Assert.assertEquals(metricContext.getParent().get().getMeters().get(previousJobSentToExecutorMeterName).getCount(), previousJobSentToExecutorCount + 2);
+ Assert.assertEquals(metricContext.getParent().get().getGauges().get(failedFlowGauge).getValue(), -1);
+ // Cleanup
+ this._dagManagerThread.run();
+
+ Assert.assertEquals(this.dags.size(), 0);
+ Assert.assertEquals(this.jobToDag.size(), 0);
+ Assert.assertEquals(this.dagToJobs.size(), 0);
+ }
+
+
@AfterClass
public void cleanUp() throws Exception {
@Test
public void matchesTest() {
- MetricNameRegexFilter metricNameRegexForDagManager = DagManager.getMetricsFilterForDagManager();
+ MetricNameRegexFilter metricNameRegexForDagManager = DagManagerMetrics.getMetricsFilterForDagManager();
Assert.assertTrue(metricNameRegexForDagManager.matches("GobblinService.testGroup.testFlow.RunningStatus", mock(Metric.class)));
Assert.assertTrue(metricNameRegexForDagManager.matches("GobblinService.test..RunningStatus", mock(Metric.class)));
Assert.assertFalse(metricNameRegexForDagManager.matches("test3.RunningStatus", mock(Metric.class)));