TEZ-3976: Batch ShuffleManager error report events (Jaume Marhuenda, reviewed by...
authorJaume Marhuenda <jmarhuenda@hortonworks.com>
Tue, 23 Oct 2018 22:30:13 +0000 (15:30 -0700)
committerGopal V <gopalv@apache.org>
Tue, 23 Oct 2018 22:30:13 +0000 (15:30 -0700)
Signed-off-by: Gopal V <gopalv@apache.org>
tez-api/src/main/java/org/apache/tez/runtime/api/events/InputReadErrorEvent.java
tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java

index 7d2e0d2..cabc39f 100644 (file)
@@ -21,6 +21,8 @@ package org.apache.tez.runtime.api.events;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.tez.runtime.api.Event;
 
+import java.util.Objects;
+
 /**
  * Event generated by an Input to indicate error when trying to retrieve data.
  * This is not necessarily a fatal event - it's an indication to the AM to retry
@@ -44,17 +46,31 @@ public final class InputReadErrorEvent extends Event {
    */
   private final int version;
 
-  private InputReadErrorEvent(String diagnostics, int index,
-                              int version) {
+  /**
+   * Number of failures.
+   */
+  private final int numFailures;
+
+  private InputReadErrorEvent(final String diagnostics, final int index,
+                              final int version, final int numFailures) {
     super();
     this.diagnostics = diagnostics;
     this.index = index;
     this.version = version;
+    this.numFailures = numFailures;
   }
 
   public static InputReadErrorEvent create(String diagnostics, int index,
                                            int version) {
-    return new InputReadErrorEvent(diagnostics, index, version);
+    return create(diagnostics, index, version, 1);
+  }
+
+  /**
+   * Create an InputReadErrorEvent.
+   */
+  public static InputReadErrorEvent create(final String diagnostics, final int index,
+      final int version, final int numFailures) {
+    return new InputReadErrorEvent(diagnostics, index, version, numFailures);
   }
 
   public String getDiagnostics() {
@@ -69,4 +85,27 @@ public final class InputReadErrorEvent extends Event {
     return version;
   }
 
+  /**
+   * @return number of failures
+   */
+  public int getNumFailures() {
+    return numFailures;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(index, version);
+  }
+
+  @Override
+  public boolean equals(final Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    InputReadErrorEvent that = (InputReadErrorEvent) o;
+    return index == that.index && version == that.version;
+  }
 }
index 85c53a5..86792e2 100644 (file)
@@ -512,6 +512,15 @@ public class TezRuntimeConfiguration {
       TEZ_RUNTIME_PREFIX + "enable.final-merge.in.output";
   public static final boolean TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT_DEFAULT = true;
 
+  /**
+   * Expert level setting. How long should @link{ShuffleManager} wait for batching
+   * before sending the events in milliseconds. Set to -1 to not wait.
+   */
+  @ConfigurationProperty(type = "integer")
+  public static final String  TEZ_RUNTIME_SHUFFLE_BATCH_WAIT =
+      TEZ_RUNTIME_PREFIX + "shuffle.batch.wait";
+  public static final int TEZ_RUNTIME_SHUFFLE_BATCH_WAIT_DEFAULT = -1;
+
 
   /**
    * Share data fetched between tasks running on the same host if applicable
@@ -619,6 +628,7 @@ public class TezRuntimeConfiguration {
     tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT);
     tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL);
     tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS);
+    tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_BATCH_WAIT);
 
     defaultConf.addResource("core-default.xml");
     defaultConf.addResource("core-site.xml");
index 5f3693f..ba8592f 100644 (file)
@@ -26,6 +26,7 @@ import java.text.DecimalFormat;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -35,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -46,6 +48,8 @@ import java.util.concurrent.locks.ReentrantLock;
 import javax.crypto.SecretKey;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.MonotonicClock;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.http.HttpConnectionParams;
 import org.apache.tez.runtime.api.TaskFailureType;
@@ -114,9 +118,30 @@ public class ShuffleManager implements FetcherCallback {
   @VisibleForTesting
   final ListeningExecutorService fetcherExecutor;
 
+  /**
+   * Executor for ReportCallable.
+   */
+  private ExecutorService reporterExecutor;
+
+  /**
+   * Lock to sync failedEvents.
+   */
+  private final ReentrantLock reportLock = new ReentrantLock();
+
+  /**
+   * Condition to wake up the thread notifying when events fail.
+   */
+  private final Condition reportCondition = reportLock.newCondition();
+
+  /**
+   * Events reporting fetcher failed.
+   */
+  private final HashMap<InputReadErrorEvent, Integer> failedEvents
+      = new HashMap<>();
+
   private final ListeningExecutorService schedulerExecutor;
   private final RunShuffleCallable schedulerCallable;
-  
+
   private final BlockingQueue<FetchedInput> completedInputs;
   private final AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false);
   @VisibleForTesting
@@ -151,6 +176,11 @@ public class ShuffleManager implements FetcherCallback {
   private final int ifileBufferSize;
   private final boolean ifileReadAhead;
   private final int ifileReadAheadLength;
+
+  /**
+   * Holds the time to wait for failures to batch them and send less events.
+   */
+  private final int maxTimeToWaitForReportMillis;
   
   private final String srcNameTrimmed;
 
@@ -199,7 +229,8 @@ public class ShuffleManager implements FetcherCallback {
     this.bytesShuffledToDiskCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_DISK);
     this.bytesShuffledToMemCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_TO_MEM);
     this.bytesShuffledDirectDiskCounter = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_BYTES_DISK_DIRECT);
-  
+
+
     this.ifileBufferSize = bufferSize;
     this.ifileReadAhead = ifileReadAheadEnabled;
     this.ifileReadAheadLength = ifileReadAheadLength;
@@ -212,6 +243,10 @@ public class ShuffleManager implements FetcherCallback {
     this.verifyDiskChecksum = conf.getBoolean(
         TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM,
         TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM_DEFAULT);
+    this.maxTimeToWaitForReportMillis = conf.getInt(
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BATCH_WAIT,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BATCH_WAIT_DEFAULT);
+
 
     this.shufflePhaseTime = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_PHASE_TIME);
     this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED);
@@ -302,12 +337,63 @@ public class ShuffleManager implements FetcherCallback {
   public void run() throws IOException {
     Preconditions.checkState(inputManager != null, "InputManager must be configured");
 
+    if (maxTimeToWaitForReportMillis > 0) {
+      reporterExecutor = Executors.newSingleThreadExecutor(
+          new ThreadFactoryBuilder().setDaemon(true)
+              .setNameFormat("ShuffleRunner {" + srcNameTrimmed + "}")
+              .build());
+      Future reporterFuture = reporterExecutor.submit(new ReporterCallable());
+    }
+
     ListenableFuture<Void> runShuffleFuture = schedulerExecutor.submit(schedulerCallable);
     Futures.addCallback(runShuffleFuture, new SchedulerFutureCallback());
     // Shutdown this executor once this task, and the callback complete.
     schedulerExecutor.shutdown();
   }
-  
+
+  private class ReporterCallable extends CallableWithNdc<Void> {
+    /**
+     * Measures if the batching interval has ended.
+     */
+    private final Clock clock;
+    ReporterCallable() {
+      clock = new MonotonicClock();
+    }
+
+    @Override
+    protected Void callInternal() throws Exception {
+      long nextReport = 0;
+      while (!isShutdown.get()) {
+        try {
+          reportLock.lock();
+          while (failedEvents.isEmpty()) {
+            boolean signaled = reportCondition.await(maxTimeToWaitForReportMillis,
+                TimeUnit.MILLISECONDS);
+          }
+
+          long currentTime = clock.getTime();
+          if (currentTime > nextReport) {
+            if (failedEvents.size() > 0) {
+              List<Event> failedEventsToSend = Lists.newArrayListWithCapacity(
+                  failedEvents.size());
+              for (InputReadErrorEvent key : failedEvents.keySet()) {
+                failedEventsToSend.add(InputReadErrorEvent
+                    .create(key.getDiagnostics(), key.getIndex(),
+                        key.getVersion(), failedEvents.get(key)));
+              }
+              inputContext.sendEvents(failedEventsToSend);
+              failedEvents.clear();
+              nextReport = currentTime + maxTimeToWaitForReportMillis;
+            }
+          }
+        } finally {
+          reportLock.unlock();
+        }
+      }
+      return null;
+    }
+  }
+
   private class RunShuffleCallable extends CallableWithNdc<Void> {
 
     private final Configuration conf;
@@ -804,18 +890,27 @@ public class ShuffleManager implements FetcherCallback {
     if (srcAttemptIdentifier == null) {
       reportFatalError(null, "Received fetchFailure for an unknown src (null)");
     } else {
-    InputReadErrorEvent readError = InputReadErrorEvent.create(
-        "Fetch failure while fetching from "
-            + TezRuntimeUtils.getTaskAttemptIdentifier(
-            inputContext.getSourceVertexName(),
-            srcAttemptIdentifier.getInputIdentifier(),
-            srcAttemptIdentifier.getAttemptNumber()),
-        srcAttemptIdentifier.getInputIdentifier(),
-        srcAttemptIdentifier.getAttemptNumber());
-
-    List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
-    failedEvents.add(readError);
-    inputContext.sendEvents(failedEvents);
+      InputReadErrorEvent readError = InputReadErrorEvent.create(
+          "Fetch failure while fetching from "
+              + TezRuntimeUtils.getTaskAttemptIdentifier(
+              inputContext.getSourceVertexName(),
+              srcAttemptIdentifier.getInputIdentifier(),
+              srcAttemptIdentifier.getAttemptNumber()),
+          srcAttemptIdentifier.getInputIdentifier(),
+          srcAttemptIdentifier.getAttemptNumber());
+      if (maxTimeToWaitForReportMillis > 0) {
+        try {
+          reportLock.lock();
+          failedEvents.merge(readError, 1, (a, b) -> a + b);
+          reportCondition.signal();
+        } finally {
+          reportLock.unlock();
+        }
+      } else {
+        List<Event> events = Lists.newArrayListWithCapacity(1);
+        events.add(readError);
+        inputContext.sendEvents(events);
+      }
     }
   }
   /////////////////// End of Methods from FetcherCallbackHandler
@@ -849,6 +944,10 @@ public class ShuffleManager implements FetcherCallback {
       if (this.schedulerExecutor != null && !this.schedulerExecutor.isShutdown()) {
         this.schedulerExecutor.shutdownNow();
       }
+      if (this.reporterExecutor != null
+          && !this.reporterExecutor.isShutdown()) {
+        this.reporterExecutor.shutdownNow();
+      }
       if (this.fetcherExecutor != null && !this.fetcherExecutor.isShutdown()) {
         this.fetcherExecutor.shutdownNow(); // Interrupts all running fetchers.
       }
index 103f83d..94f7f5a 100644 (file)
@@ -20,7 +20,9 @@ package org.apache.tez.runtime.library.common.shuffle.impl;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyList;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doAnswer;
@@ -35,6 +37,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -57,6 +60,7 @@ import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
@@ -67,8 +71,10 @@ import org.apache.tez.runtime.library.common.shuffle.InputHost;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -214,6 +220,64 @@ public class TestShuffleManager {
     verify(inputContext, atLeast(3)).notifyProgress();
   }
 
+  @Test (timeout = 200000)
+  public void testFetchFailed() throws Exception {
+    InputContext inputContext = createInputContext();
+    final ShuffleManager shuffleManager = spy(createShuffleManager(inputContext, 1));
+    Thread schedulerGetHostThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          shuffleManager.run();
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+    });
+    InputAttemptIdentifier inputAttemptIdentifier
+        = new InputAttemptIdentifier(1, 1);
+
+    schedulerGetHostThread.start();
+    Thread.sleep(1000);
+    shuffleManager.fetchFailed("host1", inputAttemptIdentifier, false);
+    Thread.sleep(1000);
+
+    ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
+    verify(inputContext, times(1))
+        .sendEvents(captor.capture());
+    Assert.assertEquals("Size was: " + captor.getAllValues().size(),
+        captor.getAllValues().size(), 1);
+    List<Event> capturedList = captor.getAllValues().get(0);
+    Assert.assertEquals("Size was: " + capturedList.size(),
+        capturedList.size(), 1);
+    InputReadErrorEvent inputEvent = (InputReadErrorEvent)capturedList.get(0);
+    Assert.assertEquals("Number of failures was: " + inputEvent.getNumFailures(),
+        inputEvent.getNumFailures(), 1);
+
+    shuffleManager.fetchFailed("host1", inputAttemptIdentifier, false);
+    shuffleManager.fetchFailed("host1", inputAttemptIdentifier, false);
+
+    Thread.sleep(1000);
+    verify(inputContext, times(1)).sendEvents(any());
+
+    // Wait more than five seconds for the batch to go out
+    Thread.sleep(5000);
+    captor = ArgumentCaptor.forClass(List.class);
+    verify(inputContext, times(2))
+        .sendEvents(captor.capture());
+    Assert.assertEquals("Size was: " + captor.getAllValues().size(),
+        captor.getAllValues().size(), 2);
+    capturedList = captor.getAllValues().get(1);
+    Assert.assertEquals("Size was: " + capturedList.size(),
+        capturedList.size(), 1);
+    inputEvent = (InputReadErrorEvent)capturedList.get(0);
+    Assert.assertEquals("Number of failures was: " + inputEvent.getNumFailures(),
+        inputEvent.getNumFailures(), 2);
+
+
+    schedulerGetHostThread.interrupt();
+  }
+
   private ShuffleManagerForTest createShuffleManager(
       InputContext inputContext, int expectedNumOfPhysicalInputs)
           throws IOException {
@@ -222,6 +286,8 @@ public class TestShuffleManager {
     doReturn(outDirs).when(inputContext).getWorkDirs();
     conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS,
         inputContext.getWorkDirs());
+    // 5 seconds
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BATCH_WAIT, 5000);
 
     DataOutputBuffer out = new DataOutputBuffer();
     Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(new JobTokenIdentifier(),