TEZ-3969. TaskAttemptImpl: static fields initialized in instance ctor (Jaume Marhuend...
authorJaume Marhuenda <jmarhuenda@hortonworks.com>
Tue, 9 Oct 2018 18:12:17 +0000 (13:12 -0500)
committerJonathan Eagles <jeagles@yahoo-inc.com>
Tue, 9 Oct 2018 18:12:17 +0000 (13:12 -0500)
tez-dag/findbugs-exclude.xml
tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java

index 1150ccb..a6ce380 100644 (file)
     <Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD"/>
   </Match>
 
-  <!-- TEZ-2552 -->
-  <Match>
-    <Class name="org.apache.tez.dag.app.dag.impl.TaskAttemptImpl"/>
-    <Or>
-      <Field name="MAX_ALLOWED_OUTPUT_FAILURES_FRACTION"/>
-      <Field name="MAX_ALLOWED_OUTPUT_FAILURES"/>
-      <Field name="MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC"/>
-    </Or>
-    <Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD"/>
-  </Match>
-
 </FindBugsFilter>
index 0e54e9f..0b2406f 100644 (file)
@@ -213,6 +213,19 @@ public interface Vertex extends Comparable<Vertex> {
     int getMaxFailedTaskAttempts();
     boolean getTaskRescheduleHigherPriority();
     boolean getTaskRescheduleRelaxedLocality();
+
+    /**
+     * @return tez.task.max.allowed.output.failures.
+     */
+    int getMaxAllowedOutputFailures();
+    /**
+     * @return tez.task.max.allowed.output.failures.fraction.
+     */
+    double getMaxAllowedOutputFailuresFraction();
+    /**
+     * @return tez.am.max.allowed.time-sec.for-read-error.
+     */
+    int getMaxAllowedTimeForTaskReadErrorSec();
   }
 
   void incrementRejectedTaskAttemptCount();
index bbec9ea..7399979 100644 (file)
@@ -214,9 +214,6 @@ public class TaskAttemptImpl implements TaskAttempt,
   Set<String> taskRacks = new HashSet<String>();
 
   private Map<TezTaskAttemptID, Long> uniquefailedOutputReports = Maps.newHashMap();
-  private static double MAX_ALLOWED_OUTPUT_FAILURES_FRACTION;
-  private static int MAX_ALLOWED_OUTPUT_FAILURES;
-  private static int MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC;
 
   protected final boolean isRescheduled;
   private final Resource taskResource;
@@ -548,18 +545,6 @@ public class TaskAttemptImpl implements TaskAttempt,
       Vertex vertex, TaskLocationHint locationHint, TaskSpec taskSpec,
       TezTaskAttemptID schedulingCausalTA) {
 
-    // TODO: Move these configs over to Vertex.VertexConfig
-    MAX_ALLOWED_OUTPUT_FAILURES = conf.getInt(TezConfiguration
-        .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, TezConfiguration
-        .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_DEFAULT);
-
-    MAX_ALLOWED_OUTPUT_FAILURES_FRACTION = conf.getDouble(TezConfiguration
-        .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, TezConfiguration
-        .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION_DEFAULT);
-    
-    MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC = conf.getInt(
-        TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC,
-        TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC_DEFAULT);
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     this.readLock = rwLock.readLock();
     this.writeLock = rwLock.writeLock();
@@ -1793,17 +1778,24 @@ public class TaskAttemptImpl implements TaskAttempt,
         attempt.uniquefailedOutputReports.put(failedDestTaId, time);
         firstErrReportTime = time;
       }
-      
+
+      int maxAllowedOutputFailures = attempt.getVertex().getVertexConfig()
+          .getMaxAllowedOutputFailures();
+      int maxAllowedTimeForTaskReadErrorSec = attempt.getVertex()
+          .getVertexConfig().getMaxAllowedTimeForTaskReadErrorSec();
+      double maxAllowedOutputFailuresFraction = attempt.getVertex()
+          .getVertexConfig().getMaxAllowedOutputFailuresFraction();
+
       int readErrorTimespanSec = (int)((time - firstErrReportTime)/1000);
-      boolean crossTimeDeadline = readErrorTimespanSec >= MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC;
+      boolean crossTimeDeadline = readErrorTimespanSec >= maxAllowedTimeForTaskReadErrorSec;
 
       int runningTasks = attempt.appContext.getCurrentDAG().getVertex(
           failedDestTaId.getTaskID().getVertexID()).getRunningTasks();
       float failureFraction = runningTasks > 0 ? ((float) attempt.uniquefailedOutputReports.size()) / runningTasks : 0;
       boolean withinFailureFractionLimits =
-          (failureFraction <= MAX_ALLOWED_OUTPUT_FAILURES_FRACTION);
+          (failureFraction <= maxAllowedOutputFailuresFraction);
       boolean withinOutputFailureLimits =
-          (attempt.uniquefailedOutputReports.size() < MAX_ALLOWED_OUTPUT_FAILURES);
+          (attempt.uniquefailedOutputReports.size() < maxAllowedOutputFailures);
 
       // If needed we can launch a background task without failing this task
       // to generate a copy of the output just in case.
@@ -1813,10 +1805,12 @@ public class TaskAttemptImpl implements TaskAttempt,
       }
       String message = attempt.getID() + " being failed for too many output errors. "
           + "failureFraction=" + failureFraction
-          + ", MAX_ALLOWED_OUTPUT_FAILURES_FRACTION=" + MAX_ALLOWED_OUTPUT_FAILURES_FRACTION
+          + ", MAX_ALLOWED_OUTPUT_FAILURES_FRACTION="
+          + maxAllowedOutputFailuresFraction
           + ", uniquefailedOutputReports=" + attempt.uniquefailedOutputReports.size()
-          + ", MAX_ALLOWED_OUTPUT_FAILURES=" + MAX_ALLOWED_OUTPUT_FAILURES
-          + ", MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC=" + MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC
+          + ", MAX_ALLOWED_OUTPUT_FAILURES=" + maxAllowedOutputFailures
+          + ", MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC="
+          + maxAllowedTimeForTaskReadErrorSec
           + ", readErrorTimespan=" + readErrorTimespanSec;
       LOG.info(message);
       attempt.addDiagnosticInfo(message);
index 0184657..a4d2de1 100644 (file)
@@ -4690,6 +4690,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     private final boolean taskRescheduleHigherPriority;
     private final boolean taskRescheduleRelaxedLocality;
 
+    /**
+     * See tez.task.max.allowed.output.failures.fraction.
+     */
+    private final double maxAllowedOutputFailuresFraction;
+    /**
+     * See tez.task.max.allowed.output.failures.
+     */
+    private final int maxAllowedOutputFailures;
+    /**
+     * See tez.am.max.allowed.time-sec.for-read-error.
+     */
+    private final int maxAllowedTimeForTaskReadErrorSec;
+
     public VertexConfigImpl(Configuration conf) {
       this.maxFailedTaskAttempts = conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
           TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT);
@@ -4699,6 +4712,18 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       this.taskRescheduleRelaxedLocality =
           conf.getBoolean(TezConfiguration.TEZ_AM_TASK_RESCHEDULE_RELAXED_LOCALITY,
               TezConfiguration.TEZ_AM_TASK_RESCHEDULE_RELAXED_LOCALITY_DEFAULT);
+
+      this.maxAllowedOutputFailures = conf.getInt(TezConfiguration
+          .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, TezConfiguration
+          .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_DEFAULT);
+
+      this.maxAllowedOutputFailuresFraction = conf.getDouble(TezConfiguration
+          .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, TezConfiguration
+          .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION_DEFAULT);
+
+      this.maxAllowedTimeForTaskReadErrorSec = conf.getInt(
+          TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC,
+          TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC_DEFAULT);
     }
 
     @Override
@@ -4715,5 +4740,26 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     public boolean getTaskRescheduleRelaxedLocality() {
       return taskRescheduleRelaxedLocality;
     }
+
+    /**
+     * @return maxAllowedOutputFailures.
+     */
+    @Override public int getMaxAllowedOutputFailures() {
+      return maxAllowedOutputFailures;
+    }
+
+    /**
+     * @return maxAllowedOutputFailuresFraction.
+     */
+    @Override public double getMaxAllowedOutputFailuresFraction() {
+      return maxAllowedOutputFailuresFraction;
+    }
+
+    /**
+     * @return maxAllowedTimeForTaskReadErrorSec.
+     */
+    @Override public int getMaxAllowedTimeForTaskReadErrorSec() {
+      return maxAllowedTimeForTaskReadErrorSec;
+    }
   }
 }
index 5ab68f7..5038810 100644 (file)
@@ -160,15 +160,20 @@ public class TestTaskAttempt {
     when(appCtx.getContainerLauncherName(anyInt())).thenReturn(
         TezConstants.getTezYarnServicePluginName());
 
-    mockVertex = mock(Vertex.class);
-    when(mockVertex.getServicePluginInfo()).thenReturn(servicePluginInfo);
-    when(mockVertex.getVertexConfig()).thenReturn(new VertexImpl.VertexConfigImpl(vertexConf));
+    createMockVertex(vertexConf);
 
     HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class);
     doReturn(mockHistHandler).when(appCtx).getHistoryHandler();
     LogManager.getRootLogger().setLevel(Level.DEBUG);
   }
 
+  private void createMockVertex(Configuration conf) {
+    mockVertex = mock(Vertex.class);
+    when(mockVertex.getServicePluginInfo()).thenReturn(servicePluginInfo);
+    when(mockVertex.getVertexConfig()).thenReturn(
+        new VertexImpl.VertexConfigImpl(conf));
+  }
+
   @Test(timeout = 5000)
   public void testLocalityRequest() {
     TaskAttemptImpl.ScheduleTaskattemptTransition sta =
@@ -1919,7 +1924,11 @@ public class TestTaskAttempt {
     verify(eventHandler, times(expectedEventsAfterFetchFailure)).handle(
         arg.capture());
 
-    taskConf.setInt(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, 1);
+    Configuration newVertexConf = new Configuration(vertexConf);
+    newVertexConf.setInt(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES,
+        1);
+    createMockVertex(newVertexConf);
+
     TezTaskID taskID2 = TezTaskID.getInstance(vertexID, 2);
     MockTaskAttemptImpl taImpl2 = new MockTaskAttemptImpl(taskID2, 1, eventHandler,
         taListener, taskConf, new SystemClock(),
@@ -1953,8 +1962,15 @@ public class TestTaskAttempt {
 
     Clock mockClock = mock(Clock.class); 
     int readErrorTimespanSec = 1;
-    taskConf.setInt(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, 10);
-    taskConf.setInt(TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC, readErrorTimespanSec);
+
+    newVertexConf = new Configuration(vertexConf);
+    newVertexConf.setInt(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES,
+        10);
+    newVertexConf.setInt(
+        TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC,
+        readErrorTimespanSec);
+    createMockVertex(newVertexConf);
+
     TezTaskID taskID3 = TezTaskID.getInstance(vertexID, 3);
     MockTaskAttemptImpl taImpl3 = new MockTaskAttemptImpl(taskID3, 1, eventHandler,
         taListener, taskConf, mockClock,