JIRA-1200
authorMaja Kabiljo <majakabiljo@fb.com>
Tue, 18 Sep 2018 19:25:34 +0000 (12:25 -0700)
committerMaja Kabiljo <majakabiljo@fb.com>
Tue, 18 Sep 2018 19:25:34 +0000 (12:25 -0700)
closes #83

giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java

index 2c38505..51887fe 100644 (file)
@@ -38,6 +38,7 @@ import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.conf.BooleanConfOption;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.counters.GiraphHadoopCounter;
 import org.apache.giraph.function.Predicate;
 import org.apache.giraph.graph.TaskInfo;
 import org.apache.giraph.master.MasterInfo;
@@ -131,6 +132,16 @@ public class NettyClient {
   public static final AttributeKey<SaslNettyClient> SASL =
       AttributeKey.valueOf("saslNettyClient");
 /*end[HADOOP_NON_SECURE]*/
+
+  /** Group name for netty counters */
+  public static final String NETTY_COUNTERS_GROUP = "Netty counters";
+  /** How many network requests were resent because they took too long */
+  public static final String NETWORK_REQUESTS_RESENT_FOR_TIMEOUT_NAME =
+      "Network requests resent for timeout";
+  /** How many network requests were resent because channel failed */
+  public static final String NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME =
+      "Network requests resent for channel failure";
+
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(NettyClient.class);
   /** Context used to report progress */
@@ -206,6 +217,11 @@ public class NettyClient {
   /** Flow control policy used */
   private final FlowControl flowControl;
 
+  /** How many network requests were resent because they took too long */
+  private final GiraphHadoopCounter networkRequestsResentForTimeout;
+  /** How many network requests were resent because channel failed */
+  private final GiraphHadoopCounter networkRequestsResentForChannelFailure;
+
   /**
    * Only constructor
    *
@@ -242,6 +258,15 @@ public class NettyClient {
       flowControl = new NoOpFlowControl(this);
     }
 
+    networkRequestsResentForTimeout =
+        new GiraphHadoopCounter(context.getCounter(
+            NETTY_COUNTERS_GROUP,
+            NETWORK_REQUESTS_RESENT_FOR_TIMEOUT_NAME));
+    networkRequestsResentForChannelFailure =
+        new GiraphHadoopCounter(context.getCounter(
+            NETTY_COUNTERS_GROUP,
+            NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME));
+
     maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);
     maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf);
     waitTimeBetweenConnectionRetriesMs =
@@ -966,17 +991,18 @@ public class NettyClient {
             (writeFuture.isDone() && !writeFuture.isSuccess()))) ||
             (requestInfo.getElapsedMsecs() > maxRequestMilliseconds);
       }
-    });
+    }, networkRequestsResentForTimeout);
   }
 
   /**
    * Resend requests which satisfy predicate
-   *
-   * @param shouldResendRequestPredicate Predicate to use to check whether
+   *  @param shouldResendRequestPredicate Predicate to use to check whether
    *                                     request should be resent
+   * @param counter Counter to increment for every resent network request
    */
   private void resendRequestsWhenNeeded(
-      Predicate<RequestInfo> shouldResendRequestPredicate) {
+      Predicate<RequestInfo> shouldResendRequestPredicate,
+      GiraphHadoopCounter counter) {
     // Check if there are open requests which have been sent a long time ago,
     // and if so, resend them.
     List<ClientRequestId> addedRequestIds = Lists.newArrayList();
@@ -1006,6 +1032,7 @@ public class NettyClient {
         addedRequestIds.add(entry.getKey());
         addedRequestInfos.add(new RequestInfo(
             requestInfo.getDestinationAddress(), requestInfo.getRequest()));
+        counter.increment();
       }
     }
 
@@ -1093,7 +1120,7 @@ public class NettyClient {
         return requestInfo.getDestinationAddress().equals(
             channel.remoteAddress());
       }
-    });
+    }, networkRequestsResentForChannelFailure);
   }
 
   /**