GIRAPH-1213 4/head
authorMaja Kabiljo <majakabiljo@fb.com>
Thu, 29 Nov 2018 19:35:53 +0000 (11:35 -0800)
committerMaja Kabiljo <majakabiljo@fb.com>
Tue, 11 Dec 2018 23:31:18 +0000 (15:31 -0800)
closes #96

giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java

index 74011b9..64c9c04 100644 (file)
@@ -82,6 +82,7 @@ import io.netty.handler.codec.FixedLengthFrameDecoder;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.util.AttributeKey;
 /*end[HADOOP_NON_SECURE]*/
+import io.netty.util.concurrent.BlockingOperationException;
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
 import io.netty.util.concurrent.EventExecutorGroup;
 
@@ -755,7 +756,11 @@ public class NettyClient {
     int reconnectFailures = 0;
     while (reconnectFailures < maxConnectionFailures) {
       ChannelFuture connectionFuture = bootstrap.connect(remoteServer);
-      ProgressableUtils.awaitChannelFuture(connectionFuture, context);
+      try {
+        ProgressableUtils.awaitChannelFuture(connectionFuture, context);
+      } catch (BlockingOperationException e) {
+        LOG.warn("getNextChannel: Failed connecting to " + remoteServer, e);
+      }
       if (connectionFuture.isSuccess()) {
         if (LOG.isInfoEnabled()) {
           LOG.info("getNextChannel: Connected to " + remoteServer + "!");
@@ -1052,7 +1057,8 @@ public class NettyClient {
               writeFuture.channel().isActive() +
               ", future done = " + writeFuture.isDone() + ", " +
               "success = " + writeFuture.isSuccess() + ", " +
-              "cause = " + writeFuture.cause();
+              "cause = " + writeFuture.cause() + ", " +
+              "channelId = " + writeFuture.channel().hashCode();
         }
         LOG.warn("checkRequestsForProblems: Problem with request id " +
             entry.getKey() + ", " + logMessage + ", " +
@@ -1080,6 +1086,11 @@ public class NettyClient {
         LOG.info("checkRequestsForProblems: Re-issuing request " + requestInfo);
       }
       writeRequestToChannel(requestInfo);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("checkRequestsForProblems: Request " + requestId +
+            " was resent through channelId=" +
+            requestInfo.getWriteFuture().channel().hashCode());
+      }
     }
     addedRequestIds.clear();
     addedRequestInfos.clear();
@@ -1147,8 +1158,11 @@ public class NettyClient {
     resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
       @Override
       public boolean apply(RequestInfo requestInfo) {
-        return requestInfo.getDestinationAddress().equals(
-            channel.remoteAddress());
+        if (requestInfo.getWriteFuture() == null ||
+            requestInfo.getWriteFuture().channel() == null) {
+          return false;
+        }
+        return requestInfo.getWriteFuture().channel().equals(channel);
       }
     }, networkRequestsResentForChannelFailure, true);
   }
@@ -1163,7 +1177,8 @@ public class NettyClient {
     @Override
     public void operationComplete(ChannelFuture future) throws Exception {
       if (future.isDone() && !future.isSuccess()) {
-        LOG.error("Request failed", future.cause());
+        LOG.error("Channel failed channelId=" + future.channel().hashCode(),
+            future.cause());
         checkRequestsAfterChannelFailure(future.channel());
       }
     }
index 12dde3b..5c6f035 100644 (file)
@@ -106,8 +106,9 @@ public class ResponseClientHandler extends ChannelInboundHandlerAdapter {
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
     throws Exception {
-    LOG.warn("exceptionCaught: Channel failed with " +
-        "remote address " + ctx.channel().remoteAddress(), cause);
+    LOG.warn("exceptionCaught: Channel channelId=" +
+        ctx.channel().hashCode() + " failed with remote address " +
+        ctx.channel().remoteAddress(), cause);
   }
 }