TEZ-3972. Tez DAG can hang when a single task fails to fetch (Kuhu Shukla via jeagles)
authorJonathan Eagles <jeagles@yahoo-inc.com>
Tue, 18 Sep 2018 22:14:44 +0000 (17:14 -0500)
committerJonathan Eagles <jeagles@yahoo-inc.com>
Tue, 18 Sep 2018 22:14:44 +0000 (17:14 -0500)
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java

index 6ad41f8..bbec9ea 100644 (file)
@@ -1797,9 +1797,9 @@ public class TaskAttemptImpl implements TaskAttempt,
       int readErrorTimespanSec = (int)((time - firstErrReportTime)/1000);
       boolean crossTimeDeadline = readErrorTimespanSec >= MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC;
 
-      float failureFraction = ((float) attempt.uniquefailedOutputReports.size())
-          / outputFailedEvent.getConsumerTaskNumber();
-
+      int runningTasks = attempt.appContext.getCurrentDAG().getVertex(
+          failedDestTaId.getTaskID().getVertexID()).getRunningTasks();
+      float failureFraction = runningTasks > 0 ? ((float) attempt.uniquefailedOutputReports.size()) / runningTasks : 0;
       boolean withinFailureFractionLimits =
           (failureFraction <= MAX_ALLOWED_OUTPUT_FAILURES_FRACTION);
       boolean withinOutputFailureLimits =
index 503e418..5ab68f7 100644 (file)
@@ -1820,6 +1820,7 @@ public class TestTaskAttempt {
     doReturn(containers).when(appCtx).getAllContainers();
     HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class);
     doReturn(mockHistHandler).when(appCtx).getHistoryHandler();
+    DAGImpl mockDAG = mock(DAGImpl.class);
 
     TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
     MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
@@ -1852,6 +1853,14 @@ public class TestTaskAttempt {
     EventMetaData mockMeta = mock(EventMetaData.class);
     TezTaskAttemptID mockDestId1 = mock(TezTaskAttemptID.class);
     when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1);
+    TezTaskID destTaskID = mock(TezTaskID.class);
+    TezVertexID destVertexID = mock(TezVertexID.class);
+    when(mockDestId1.getTaskID()).thenReturn(destTaskID);
+    when(destTaskID.getVertexID()).thenReturn(destVertexID);
+    Vertex destVertex = mock(VertexImpl.class);
+    when(destVertex.getRunningTasks()).thenReturn(11);
+    when(mockDAG.getVertex(destVertexID)).thenReturn(destVertex);
+    when(appCtx.getCurrentDAG()).thenReturn(mockDAG);
     TezEvent tzEvent = new TezEvent(mockReEvent, mockMeta);
     taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 11));
     
@@ -1868,7 +1877,14 @@ public class TestTaskAttempt {
 
     // different destination attempt reports error. now threshold crossed
     TezTaskAttemptID mockDestId2 = mock(TezTaskAttemptID.class);
-    when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId2);    
+    when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId2);
+    destTaskID = mock(TezTaskID.class);
+    destVertexID = mock(TezVertexID.class);
+    when(mockDestId2.getTaskID()).thenReturn(destTaskID);
+    when(destTaskID.getVertexID()).thenReturn(destVertexID);
+    destVertex = mock(VertexImpl.class);
+    when(destVertex.getRunningTasks()).thenReturn(11);
+    when(mockDAG.getVertex(destVertexID)).thenReturn(destVertex);
     taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 11));
     
     assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
@@ -1923,6 +1939,7 @@ public class TestTaskAttempt {
     mockReEvent = InputReadErrorEvent.create("", 1, 1);
     mockMeta = mock(EventMetaData.class);
     mockDestId1 = mock(TezTaskAttemptID.class);
+    when(mockDestId1.getTaskID()).thenReturn(destTaskID);
     when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1);
     tzEvent = new TezEvent(mockReEvent, mockMeta);
     //This should fail even when MAX_ALLOWED_OUTPUT_FAILURES_FRACTION is within limits, as
@@ -1957,9 +1974,11 @@ public class TestTaskAttempt {
     mockReEvent = InputReadErrorEvent.create("", 1, 1);
     mockMeta = mock(EventMetaData.class);
     mockDestId1 = mock(TezTaskAttemptID.class);
+    when(mockDestId1.getTaskID()).thenReturn(destTaskID);
     when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1);
     tzEvent = new TezEvent(mockReEvent, mockMeta);
     when(mockClock.getTime()).thenReturn(1000L);
+    when(destVertex.getRunningTasks()).thenReturn(1000);
     // time deadline not exceeded for a couple of read error events
     taImpl3.handle(new TaskAttemptEventOutputFailed(taskAttemptID3, tzEvent, 1000));
     assertEquals("Task attempt is not in succeeded state", taImpl3.getState(),
@@ -1978,6 +1997,93 @@ public class TestTaskAttempt {
     verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID3);
   }
 
+  @Test(timeout = 60000)
+  public void testTAFailureBasedOnRunningTasks() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+        appId, 0);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+    TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+
+    MockEventHandler mockEh = new MockEventHandler();
+    MockEventHandler eventHandler = spy(mockEh);
+    TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
+
+    Configuration taskConf = new Configuration();
+    taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    taskConf.setBoolean("fs.file.impl.disable.cache", true);
+
+    locationHint = TaskLocationHint.createTaskLocationHint(
+        new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
+    Resource resource = Resource.newInstance(1024, 1);
+
+    NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    @SuppressWarnings("deprecation")
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
+        new ContainerContextMatcher(), appCtx);
+    containers.addContainerIfNew(container, 0, 0, 0);
+
+    doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+    doReturn(containers).when(appCtx).getAllContainers();
+    HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class);
+    doReturn(mockHistHandler).when(appCtx).getHistoryHandler();
+    DAGImpl mockDAG = mock(DAGImpl.class);
+
+    TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
+    MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+        taListener, taskConf, new SystemClock(),
+        mockHeartbeatHandler, appCtx, false,
+        resource, createFakeContainerContext(), false);
+    TezTaskAttemptID taskAttemptID = taImpl.getID();
+
+    taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
+    taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId));
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID));
+    verify(mockHeartbeatHandler).register(taskAttemptID);
+    taImpl.handle(new TaskAttemptEvent(taskAttemptID,
+        TaskAttemptEventType.TA_DONE));
+    assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
+        TaskAttemptState.SUCCEEDED);
+    verify(mockHeartbeatHandler).unregister(taskAttemptID);
+
+    int expectedEventsTillSucceeded = 8;
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    ArgumentCaptor<DAGHistoryEvent> histArg = ArgumentCaptor.forClass(DAGHistoryEvent.class);
+    verify(eventHandler, times(expectedEventsTillSucceeded)).handle(arg.capture());
+    verify(mockHistHandler, times(2)).handle(histArg.capture()); // start and finish
+    DAGHistoryEvent histEvent = histArg.getValue();
+    TaskAttemptFinishedEvent finishEvent = (TaskAttemptFinishedEvent)histEvent.getHistoryEvent();
+    long finishTime = finishEvent.getFinishTime();
+    verifyEventType(arg.getAllValues(), TaskEventTAUpdate.class, 2);
+
+    InputReadErrorEvent mockReEvent = InputReadErrorEvent.create("", 0, 1);
+    EventMetaData mockMeta = mock(EventMetaData.class);
+    TezTaskAttemptID mockDestId1 = mock(TezTaskAttemptID.class);
+    when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1);
+    TezTaskID destTaskID = mock(TezTaskID.class);
+    TezVertexID destVertexID = mock(TezVertexID.class);
+    when(mockDestId1.getTaskID()).thenReturn(destTaskID);
+    when(destTaskID.getVertexID()).thenReturn(destVertexID);
+    Vertex destVertex = mock(VertexImpl.class);
+    when(destVertex.getRunningTasks()).thenReturn(5);
+    when(mockDAG.getVertex(destVertexID)).thenReturn(destVertex);
+    when(appCtx.getCurrentDAG()).thenReturn(mockDAG);
+    TezEvent tzEvent = new TezEvent(mockReEvent, mockMeta);
+    taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 11));
+
+    // failure threshold is met due to running tasks. state is FAILED
+    assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
+        TaskAttemptState.FAILED);
+  }
+
   @SuppressWarnings("deprecation")
   @Test(timeout = 5000)
   public void testKilledInNew() throws ServicePluginException {