[SPARK-23182][CORE] Allow enabling TCP keep alive on the RPC connections
authorPetar Petrov <petar.petrov@leanplum.com>
Sun, 13 Jan 2019 19:39:12 +0000 (13:39 -0600)
committerSean Owen <sean.owen@databricks.com>
Sun, 13 Jan 2019 19:39:12 +0000 (13:39 -0600)
## What changes were proposed in this pull request?

Make it possible for the master to enable TCP keep alive on the RPC connections with clients.

## How was this patch tested?

Manually tested.

Added the following:
```
spark.rpc.io.enableTcpKeepAlive  true
```
to spark-defaults.conf.

Observed the following on the Spark master:
```
$ netstat -town | grep 7077
tcp6       0      0 10.240.3.134:7077       10.240.1.25:42851       ESTABLISHED keepalive (6736.50/0/0)
tcp6       0      0 10.240.3.134:44911      10.240.3.134:7077       ESTABLISHED keepalive (4098.68/0/0)
tcp6       0      0 10.240.3.134:7077       10.240.3.134:44911      ESTABLISHED keepalive (4098.68/0/0)
```

Which proves that the keep alive setting is taking effect.

It's currently possible to enable TCP keep alive on the worker / executor, but is not possible to configure on other RPC connections. It's unclear to me why this could be the case. Keep alive is more important for the master to protect it against suddenly departing workers / executors, thus I think it's very important to have it. Particularly this makes the master resilient in case of using preemptible worker VMs in GCE. GCE has the concept of shutdown scripts, which it doesn't guarantee to execute. So workers often don't get shutdown gracefully and the TCP connections on the master linger as there's nothing to close them. Thus the need of enabling keep alive.

This enables keep-alive on connections besides the master's connections, but that shouldn't cause harm.

Closes #20512 from peshopetrov/master.

Authored-by: Petar Petrov <petar.petrov@leanplum.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java

index a0ecde2..9b327d5 100644 (file)
@@ -126,6 +126,10 @@ public class TransportServer implements Closeable {
       bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
     }
 
+    if (conf.enableTcpKeepAlive()) {
+      bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
+    }
+
     bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
       @Override
       protected void initChannel(SocketChannel ch) {
index 89ee5ee..3628da6 100644 (file)
@@ -42,6 +42,7 @@ public class TransportConf {
   private final String SPARK_NETWORK_IO_RETRYWAIT_KEY;
   private final String SPARK_NETWORK_IO_LAZYFD_KEY;
   private final String SPARK_NETWORK_VERBOSE_METRICS;
+  private final String SPARK_NETWORK_IO_ENABLETCPKEEPALIVE_KEY;
 
   private final ConfigProvider conf;
 
@@ -64,6 +65,7 @@ public class TransportConf {
     SPARK_NETWORK_IO_RETRYWAIT_KEY = getConfKey("io.retryWait");
     SPARK_NETWORK_IO_LAZYFD_KEY = getConfKey("io.lazyFD");
     SPARK_NETWORK_VERBOSE_METRICS = getConfKey("io.enableVerboseMetrics");
+    SPARK_NETWORK_IO_ENABLETCPKEEPALIVE_KEY = getConfKey("io.enableTcpKeepAlive");
   }
 
   public int getInt(String name, int defaultValue) {
@@ -174,6 +176,14 @@ public class TransportConf {
   }
 
   /**
+   * Whether to enable TCP keep-alive. If true, the TCP keep-alives are enabled, which removes
+   * connections that are idle for too long.
+   */
+  public boolean enableTcpKeepAlive() {
+    return conf.getBoolean(SPARK_NETWORK_IO_ENABLETCPKEEPALIVE_KEY, false);
+  }
+
+  /**
    * Maximum number of retries when binding to a port before giving up.
    */
   public int portMaxRetries() {