GIRAPH-1205
authorMaja Kabiljo <majakabiljo@fb.com>
Thu, 18 Oct 2018 21:29:38 +0000 (14:29 -0700)
committerMaja Kabiljo <majakabiljo@fb.com>
Thu, 18 Oct 2018 21:29:38 +0000 (14:29 -0700)
closes #88

giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java

index 51887fe..83dd7f5 100644 (file)
@@ -141,6 +141,9 @@ public class NettyClient {
   /** How many network requests were resent because channel failed */
   public static final String NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME =
       "Network requests resent for channel failure";
+  /** How many network requests were resent because connection failed */
+  public static final String NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME =
+      "Network requests resent for connection or request failure";
 
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(NettyClient.class);
@@ -221,6 +224,8 @@ public class NettyClient {
   private final GiraphHadoopCounter networkRequestsResentForTimeout;
   /** How many network requests were resent because channel failed */
   private final GiraphHadoopCounter networkRequestsResentForChannelFailure;
+  /** How many network requests were resent because connection failed */
+  private final GiraphHadoopCounter networkRequestsResentForConnectionFailure;
 
   /**
    * Only constructor
@@ -266,6 +271,10 @@ public class NettyClient {
         new GiraphHadoopCounter(context.getCounter(
             NETTY_COUNTERS_GROUP,
             NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME));
+    networkRequestsResentForConnectionFailure =
+      new GiraphHadoopCounter(context.getCounter(
+        NETTY_COUNTERS_GROUP,
+        NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME));
 
     maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);
     maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf);
@@ -984,14 +993,19 @@ public class NettyClient {
     resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
       @Override
       public boolean apply(RequestInfo requestInfo) {
-        ChannelFuture writeFuture = requestInfo.getWriteFuture();
-        // If not connected anymore, request failed, or the request is taking
-        // too long, re-establish and resend
-        return (writeFuture != null && (!writeFuture.channel().isActive() ||
-            (writeFuture.isDone() && !writeFuture.isSuccess()))) ||
-            (requestInfo.getElapsedMsecs() > maxRequestMilliseconds);
+        // If the request is taking too long, re-establish and resend
+        return requestInfo.getElapsedMsecs() > maxRequestMilliseconds;
       }
     }, networkRequestsResentForTimeout);
+    resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
+      @Override
+      public boolean apply(RequestInfo requestInfo) {
+        ChannelFuture writeFuture = requestInfo.getWriteFuture();
+        // If not connected anymore or request failed re-establish and resend
+        return writeFuture != null && (!writeFuture.channel().isActive() ||
+            (writeFuture.isDone() && !writeFuture.isSuccess()));
+      }
+    }, networkRequestsResentForConnectionFailure);
   }
 
   /**