TEZ-3982. DAGAppMaster and tasks should not report negative or invalid progress ...
authorJonathan Eagles <jeagles@yahoo-inc.com>
Thu, 20 Sep 2018 15:26:42 +0000 (10:26 -0500)
committerJonathan Eagles <jeagles@yahoo-inc.com>
Thu, 20 Sep 2018 15:26:42 +0000 (10:26 -0500)
tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java
tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java

index 407a20e..07b066c 100644 (file)
@@ -47,7 +47,10 @@ public class ProgressHelper {
         if (inputs != null && inputs.size() != 0) {
           for (LogicalInput input : inputs.values()) {
             if (input instanceof AbstractLogicalInput) {
-              progSum += ((AbstractLogicalInput) input).getProgress();
+              float inputProgress = ((AbstractLogicalInput) input).getProgress();
+              if (inputProgress >= 0.0f && inputProgress <= 1.0f) {
+                progSum += inputProgress;
+              }
             }
           }
           progress = (1.0f) * progSum / inputs.size();
index 42a9d57..177ba56 100644 (file)
@@ -1195,7 +1195,7 @@ public class DAGAppMaster extends AbstractService {
   }
 
   public float getProgress() {
-    if (isSession && state.equals(DAGAppMasterState.IDLE)) {
+    if (isSession && getState().equals(DAGAppMasterState.IDLE)) {
       return 0.0f;
     }
     if(currentDAG != null) {
index 6dcc7f0..db51cee 100644 (file)
@@ -806,9 +806,17 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     try {
       float progress = 0.0f;
       for (Vertex v : getVertices().values()) {
-        progress += v.getProgress();
+        float vertexProgress = v.getProgress();
+        if (vertexProgress >= 0.0f && vertexProgress <= 1.0f) {
+          progress += vertexProgress;
+        }
+      }
+      float dagProgress = progress / getTotalVertices();
+      if (dagProgress >= 0.0f && progress <= 1.0f) {
+        return dagProgress;
+      } else {
+        return 0.0f;
       }
-      return progress / getTotalVertices();
     } finally {
       this.readLock.unlock();
     }
index 570c6dc..7a7dfe2 100644 (file)
 
 package org.apache.tez.dag.app;
 
+import org.apache.hadoop.yarn.util.MonotonicClock;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.records.TezVertexID;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInput;
@@ -29,8 +35,10 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.BiMap;
@@ -387,6 +395,81 @@ public class TestDAGAppMaster {
     testDagCredentials(true);
   }
 
+  @Test
+  public void testBadProgress() throws Exception {
+    TezConfiguration conf = new TezConfiguration();
+    conf.setBoolean(TezConfiguration.TEZ_AM_CREDENTIALS_MERGE, true);
+    conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, TEST_DIR.toString());
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
+
+    // create some sample AM credentials
+    Credentials amCreds = new Credentials();
+    JobTokenSecretManager jtsm = new JobTokenSecretManager();
+    JobTokenIdentifier identifier = new JobTokenIdentifier(
+        new Text(appId.toString()));
+    Token<JobTokenIdentifier> sessionToken =
+        new Token<JobTokenIdentifier>(identifier, jtsm);
+    sessionToken.setService(identifier.getJobId());
+    TokenCache.setSessionToken(sessionToken, amCreds);
+    TestTokenSecretManager ttsm = new TestTokenSecretManager();
+    Text tokenAlias1 = new Text("alias1");
+    Token<TestTokenIdentifier> amToken1 = new Token<TestTokenIdentifier>(
+        new TestTokenIdentifier(new Text("amtoken1")), ttsm);
+    amCreds.addToken(tokenAlias1, amToken1);
+
+    FileSystem fs = FileSystem.getLocal(conf);
+    FSDataOutputStream sessionJarsPBOutStream =
+        TezCommonUtils.createFileForAM(fs, new Path(TEST_DIR.toString(),
+            TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME));
+    DAGProtos.PlanLocalResourcesProto.getDefaultInstance()
+        .writeDelimitedTo(sessionJarsPBOutStream);
+    sessionJarsPBOutStream.close();
+    DAGAppMaster am = spy(new DAGAppMaster(attemptId,
+        ContainerId.newContainerId(attemptId, 1),
+        "127.0.0.1", 0, 0, new MonotonicClock(), 1, true,
+        TEST_DIR.toString(), new String[] {TEST_DIR.toString()},
+        new String[] {TEST_DIR.toString()},
+        new TezApiVersionInfo().getVersion(), amCreds,
+        "someuser", null));
+    when(am.getState()).thenReturn(DAGAppMasterState.RUNNING);
+    am.init(conf);
+    am.start();
+    Credentials dagCreds = new Credentials();
+    Token<TestTokenIdentifier> dagToken1 = new Token<TestTokenIdentifier>(
+        new TestTokenIdentifier(new Text("dagtoken1")), ttsm);
+    dagCreds.addToken(tokenAlias1, dagToken1);
+    Text tokenAlias3 = new Text("alias3");
+    Token<TestTokenIdentifier> dagToken2 = new Token<TestTokenIdentifier>(
+        new TestTokenIdentifier(new Text("dagtoken2")), ttsm);
+    dagCreds.addToken(tokenAlias3, dagToken2);
+    TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+    DAGPlan dagPlan = DAGPlan.newBuilder()
+        .setName("somedag")
+        .setCredentialsBinary(
+            DagTypeConverters.convertCredentialsToProto(dagCreds))
+        .build();
+    DAGImpl dag = spy(am.createDAG(dagPlan, dagId));
+    am.setCurrentDAG(dag);
+    when(dag.getState()).thenReturn(DAGState.RUNNING);
+    Map<TezVertexID, Vertex> map = new HashMap<TezVertexID, Vertex>();
+    TezVertexID mockVertexID = mock(TezVertexID.class);
+    Vertex mockVertex = mock(Vertex.class);
+    when(mockVertex.getProgress()).thenReturn(Float.NaN);
+    map.put(mockVertexID, mockVertex);
+    when(dag.getVertices()).thenReturn(map);
+    when(dag.getTotalVertices()).thenReturn(1);
+    Assert.assertEquals("Progress was NaN and should be reported as 0",
+        0, am.getProgress(), 0);
+    when(mockVertex.getProgress()).thenReturn(-10f);
+    Assert.assertEquals("Progress was negative and should be reported as 0",
+        0, am.getProgress(), 0);
+    when(mockVertex.getProgress()).thenReturn(10f);
+    Assert.assertEquals("Progress was greater than 1 and should be reported as 0",
+        0, am.getProgress(), 0);
+  }
+
   @SuppressWarnings("deprecation")
   private void testDagCredentials(boolean doMerge) throws IOException {
     TezConfiguration conf = new TezConfiguration();