HIVE-20841: LLAP: Make dynamic ports configurable (Prasanth Jayachandran reviewed...
authorPrasanth Jayachandran <prasanthj@apache.org>
Tue, 12 Feb 2019 08:28:09 +0000 (00:28 -0800)
committerPrasanth Jayachandran <prasanthj@apache.org>
Tue, 12 Feb 2019 08:28:09 +0000 (00:28 -0800)
common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
llap-client/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java

index 2156ff1..4a86b0a 100644 (file)
@@ -4165,6 +4165,8 @@ public class HiveConf extends Configuration {
     LLAP_DAEMON_RPC_NUM_HANDLERS("hive.llap.daemon.rpc.num.handlers", 5,
       "Number of RPC handlers for LLAP daemon.", "llap.daemon.rpc.num.handlers"),
 
+    LLAP_PLUGIN_RPC_PORT("hive.llap.plugin.rpc.port", 0,
+      "Port to use for LLAP plugin rpc server"),
     LLAP_PLUGIN_RPC_NUM_HANDLERS("hive.llap.plugin.rpc.num.handlers", 1,
       "Number of RPC handlers for AM LLAP plugin endpoint."),
     LLAP_DAEMON_WORK_DIRS("hive.llap.daemon.work.dirs", "",
@@ -4338,6 +4340,8 @@ public class HiveConf extends Configuration {
       "Sleep duration (in milliseconds) to wait before retrying on error when obtaining a\n" +
       "connection to LLAP daemon from Tez AM.",
       "llap.task.communicator.connection.sleep-between-retries-millis"),
+    LLAP_TASK_UMBILICAL_SERVER_PORT("hive.llap.daemon.umbilical.port", 0,
+      "LLAP task umbilical server RPC port"),
     LLAP_DAEMON_WEB_PORT("hive.llap.daemon.web.port", 15002, "LLAP daemon web UI port.",
       "llap.daemon.service.port"),
     LLAP_DAEMON_WEB_SSL("hive.llap.daemon.web.ssl", false,
index 89cb6fb..a16c0af 100644 (file)
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto;
 import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
 import org.apache.hadoop.ipc.RPC;
@@ -53,11 +54,14 @@ public class LlapTaskUmbilicalServer {
 
   public LlapTaskUmbilicalServer(Configuration conf, LlapTaskUmbilicalProtocol umbilical, int numHandlers) throws IOException {
     jobTokenSecretManager = new JobTokenSecretManager();
-
+    int umbilicalPort = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_TASK_UMBILICAL_SERVER_PORT);
+    if (umbilicalPort <= 0) {
+      umbilicalPort = 0;
+    }
     server = new RPC.Builder(conf)
         .setProtocol(LlapTaskUmbilicalProtocol.class)
         .setBindAddress("0.0.0.0")
-        .setPort(0)
+        .setPort(umbilicalPort)
         .setInstance(umbilical)
         .setNumHandlers(numHandlers)
         .setSecretManager(jobTokenSecretManager).build();
index 0120bb6..dc10f22 100644 (file)
@@ -255,10 +255,14 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
 
       int numHandlers =
           HiveConf.getIntVar(conf, ConfVars.LLAP_TASK_COMMUNICATOR_LISTENER_THREAD_COUNT);
+      int umbilicalPort = HiveConf.getIntVar(conf, ConfVars.LLAP_TASK_UMBILICAL_SERVER_PORT);
+      if (umbilicalPort <= 0) {
+        umbilicalPort = 0;
+      }
       server = new RPC.Builder(conf)
           .setProtocol(LlapTaskUmbilicalProtocol.class)
           .setBindAddress("0.0.0.0")
-          .setPort(0)
+          .setPort(umbilicalPort)
           .setInstance(umbilical)
           .setNumHandlers(numHandlers)
           .setSecretManager(jobTokenSecretManager).build();
index 79bca60..560cbaa 100644 (file)
@@ -338,7 +338,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         serializedToken = jobIdForToken = null;
       }
       pluginEndpoint = new LlapPluginServerImpl(sm,
-          HiveConf.getIntVar(conf, ConfVars.LLAP_PLUGIN_RPC_NUM_HANDLERS), this);
+          HiveConf.getIntVar(conf, ConfVars.LLAP_PLUGIN_RPC_NUM_HANDLERS), this, HiveConf.getIntVar(conf, ConfVars.LLAP_PLUGIN_RPC_PORT));
     } else {
       serializedToken = jobIdForToken = null;
       pluginEndpoint = null;
index e9a011a..6e6785e 100644 (file)
@@ -42,13 +42,16 @@ public class LlapPluginServerImpl extends AbstractService implements LlapPluginP
   private final int numHandlers;
   private final LlapTaskSchedulerService parent;
   private final AtomicReference<InetSocketAddress> bindAddress = new AtomicReference<>();
+  private final int port;
 
   public LlapPluginServerImpl(SecretManager<JobTokenIdentifier> secretManager,
-      int numHandlers, LlapTaskSchedulerService parent) {
+      int numHandlers, LlapTaskSchedulerService parent, int port) {
     super("LlapPluginServerImpl");
     this.secretManager = secretManager;
     this.numHandlers = numHandlers;
     this.parent = parent;
+    this.port = port <= 0 ? 0 : port;
+    LOG.info("Llap plugin server using port: {} #handlers: {}", port, numHandlers);
   }
 
   @Override
@@ -63,7 +66,7 @@ public class LlapPluginServerImpl extends AbstractService implements LlapPluginP
     final Configuration conf = getConfig();
     final BlockingService daemonImpl =
         LlapPluginProtocolProtos.LlapPluginProtocol.newReflectiveBlockingService(this);
-    server = LlapUtil.startProtocolServer(0, numHandlers, bindAddress , conf, daemonImpl,
+    server = LlapUtil.startProtocolServer(port, numHandlers, bindAddress , conf, daemonImpl,
         LlapPluginProtocolPB.class, secretManager, new LlapPluginPolicyProvider(),
         ConfVars.LLAP_PLUGIN_ACL, ConfVars.LLAP_PLUGIN_ACL_DENY);
     LOG.info("Starting the plugin endpoint on port " + bindAddress.get().getPort());