HIVE-21149 : Refactor LlapServiceDriver (Miklos Gergely via Ashutosh Chauhan)
authorMiklos Gergely <mgergely@hortonworks.com>
Tue, 22 Jan 2019 23:16:00 +0000 (15:16 -0800)
committerAshutosh Chauhan <hashutosh@apache.org>
Wed, 13 Feb 2019 19:20:39 +0000 (11:20 -0800)
Signed-off-by: Ashutosh Chauhan <hashutosh@apache.org>
16 files changed:
bin/ext/llap.sh
llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java [deleted file]
llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java [deleted file]
llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapSliderUtils.java
llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyAuxJars.java [new file with mode: 0644]
llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyConfigs.java [new file with mode: 0644]
llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyLocalJars.java [new file with mode: 0644]
llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCreateUdfFile.java [new file with mode: 0644]
llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskDownloadTezJars.java [new file with mode: 0644]
llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapConfigJsonCreator.java [new file with mode: 0644]
llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapServiceCommandLine.java [new file with mode: 0644]
llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapServiceDriver.java [new file with mode: 0644]
llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapTarComponentGatherer.java [new file with mode: 0644]
llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/package-info.java [new file with mode: 0644]
llap-server/src/test/org/apache/hadoop/hive/llap/cli/service/TestLlapServiceCommandLine.java [new file with mode: 0644]
llap-server/src/test/org/apache/hadoop/hive/llap/cli/service/package-info.java [new file with mode: 0644]

index 91a54b3..3eb1573 100644 (file)
@@ -18,7 +18,7 @@ export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} "
 
 llap () {
   TMPDIR=$(mktemp -d /tmp/staging-yarn-XXXXXX)
-  CLASS=org.apache.hadoop.hive.llap.cli.LlapServiceDriver;
+  CLASS=org.apache.hadoop.hive.llap.cli.service.LlapServiceDriver;
   if [ ! -f ${HIVE_LIB}/hive-cli-*.jar ]; then
     echo "Missing Hive CLI Jar"
     exit 3;
@@ -44,7 +44,7 @@ llap () {
 }
 
 llap_help () {
-  CLASS=org.apache.hadoop.hive.llap.cli.LlapServiceDriver;
+  CLASS=org.apache.hadoop.hive.llap.cli.service.LlapServiceDriver;
   execHiveCmd $CLASS "--help"
 } 
 
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java
deleted file mode 100644 (file)
index 2445075..0000000
+++ /dev/null
@@ -1,382 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.cli;
-
-import com.google.common.base.Preconditions;
-import jline.TerminalFactory;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import javax.annotation.Nonnull;
-
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.log.LogHelpers;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.util.StringUtils;
-
-public class LlapOptionsProcessor {
-
-  public static final String OPTION_INSTANCES = "instances"; //forward as arg
-  public static final String OPTION_NAME = "name"; // forward as arg
-  public static final String OPTION_DIRECTORY = "directory"; // work-dir
-  public static final String OPTION_EXECUTORS = "executors"; // llap-daemon-site
-  public static final String OPTION_CACHE = "cache"; // llap-daemon-site
-  public static final String OPTION_SIZE = "size"; // forward via config.json
-  public static final String OPTION_XMX = "xmx"; // forward as arg
-  public static final String OPTION_AUXJARS = "auxjars"; // used to localize jars
-  public static final String OPTION_AUXHIVE = "auxhive"; // used to localize jars
-  public static final String OPTION_AUXHBASE = "auxhbase"; // used to localize jars
-  public static final String OPTION_JAVA_HOME = "javaHome"; // forward via config.json
-  public static final String OPTION_HIVECONF = "hiveconf"; // llap-daemon-site if relevant parameter
-  public static final String OPTION_SERVICE_AM_CONTAINER_MB = "service-am-container-mb"; // forward as arg
-  public static final String OPTION_SERVICE_APPCONFIG_GLOBAL = "service-appconfig-global"; // forward as arg
-  public static final String OPTION_LLAP_QUEUE = "queue"; // forward via config.json
-  public static final String OPTION_IO_THREADS = "iothreads"; // llap-daemon-site
-
-  // Options for the python script that are here because our option parser cannot ignore the unknown ones
-  public static final String OPTION_ARGS = "args"; // forward as arg
-  public static final String OPTION_LOGLEVEL = "loglevel"; // forward as arg
-  public static final String OPTION_LOGGER = "logger"; // forward as arg
-  public static final String OPTION_SERVICE_KEYTAB_DIR = "service-keytab-dir";
-  public static final String OPTION_SERVICE_KEYTAB = "service-keytab";
-  public static final String OPTION_SERVICE_PRINCIPAL = "service-principal";
-  public static final String OPTION_SERVICE_PLACEMENT = "service-placement";
-  public static final String OPTION_SERVICE_DEFAULT_KEYTAB = "service-default-keytab";
-  public static final String OPTION_OUTPUT_DIR = "output";
-  public static final String OPTION_START = "startImmediately";
-  public static final String OPTION_HEALTH_PERCENT = "health-percent";
-  public static final String OPTION_HEALTH_TIME_WINDOW_SECS = "health-time-window-secs";
-  public static final String OPTION_HEALTH_INIT_DELAY_SECS = "health-init-delay-secs";
-
-  public static class LlapOptions {
-    private final int instances;
-    private final String directory;
-    private final String name;
-    private final int executors;
-    private final int ioThreads;
-    private final long cache;
-    private final long size;
-    private final long xmx;
-    private final String jars;
-    private final boolean isHbase;
-    private final Properties conf;
-    private final String javaPath;
-    private final String llapQueueName;
-    private final String logger;
-    private final boolean isStarting;
-    private final String output;
-    private final boolean isHiveAux;
-
-    public LlapOptions(String name, int instances, String directory, int executors, int ioThreads,
-        long cache, long size, long xmx, String jars, boolean isHbase,
-        @Nonnull Properties hiveconf, String javaPath, String llapQueueName, String logger,
-        boolean isStarting, String output, boolean isHiveAux) throws ParseException {
-      if (instances <= 0) {
-        throw new ParseException("Invalid configuration: " + instances
-            + " (should be greater than 0)");
-      }
-      this.instances = instances;
-      this.directory = directory;
-      this.name = name;
-      this.executors = executors;
-      this.ioThreads = ioThreads;
-      this.cache = cache;
-      this.size = size;
-      this.xmx = xmx;
-      this.jars = jars;
-      this.isHbase = isHbase;
-      this.isHiveAux = isHiveAux;
-      this.conf = hiveconf;
-      this.javaPath = javaPath;
-      this.llapQueueName = llapQueueName;
-      this.logger = logger;
-      this.isStarting = isStarting;
-      this.output = output;
-    }
-
-    public String getOutput() {
-      return output;
-    }
-
-    public String getName() {
-      return name;
-    }
-
-    public int getInstances() {
-      return instances;
-    }
-
-    public String getDirectory() {
-      return directory;
-    }
-
-    public int getExecutors() {
-      return executors;
-    }
-
-    public int getIoThreads() {
-      return ioThreads;
-    }
-
-    public long getCache() {
-      return cache;
-    }
-
-    public long getSize() {
-      return size;
-    }
-
-    public long getXmx() {
-      return xmx;
-    }
-
-    public String getAuxJars() {
-      return jars;
-    }
-
-    public boolean getIsHBase() {
-      return isHbase;
-    }
-
-    public boolean getIsHiveAux() {
-      return isHiveAux;
-    }
-
-    public Properties getConfig() {
-      return conf;
-    }
-
-    public String getJavaPath() {
-      return javaPath;
-    }
-
-    public String getLlapQueueName() {
-      return llapQueueName;
-    }
-
-    public String getLogger() {
-      return logger;
-    }
-
-    public boolean isStarting() {
-      return isStarting;
-    }
-  }
-
-  protected static final Logger l4j = LoggerFactory.getLogger(LlapOptionsProcessor.class.getName());
-  private final Options options = new Options();
-  private org.apache.commons.cli.CommandLine commandLine;
-
-  @SuppressWarnings("static-access")
-  public LlapOptionsProcessor() {
-
-    // set the number of instances on which llap should run
-    options.addOption(OptionBuilder.hasArg().withArgName(OPTION_INSTANCES).withLongOpt(OPTION_INSTANCES)
-        .withDescription("Specify the number of instances to run this on").create('i'));
-
-    options.addOption(OptionBuilder.hasArg().withArgName(OPTION_NAME).withLongOpt(OPTION_NAME)
-        .withDescription("Cluster name for YARN registry").create('n'));
-
-    options.addOption(OptionBuilder.hasArg().withArgName(OPTION_DIRECTORY).withLongOpt(OPTION_DIRECTORY)
-        .withDescription("Temp directory for jars etc.").create('d'));
-
-    options.addOption(OptionBuilder.hasArg().withArgName(OPTION_ARGS).withLongOpt(OPTION_ARGS)
-        .withDescription("java arguments to the llap instance").create('a'));
-
-    options.addOption(OptionBuilder.hasArg().withArgName(OPTION_LOGLEVEL).withLongOpt(OPTION_LOGLEVEL)
-        .withDescription("log levels for the llap instance").create('l'));
-
-    options.addOption(OptionBuilder.hasArg().withArgName(OPTION_LOGGER).withLongOpt(OPTION_LOGGER)
-        .withDescription(
-            "logger for llap instance ([" + LogHelpers.LLAP_LOGGER_NAME_RFA + "], " +
-                LogHelpers.LLAP_LOGGER_NAME_QUERY_ROUTING + ", " + LogHelpers.LLAP_LOGGER_NAME_CONSOLE)
-        .create());
-
-    options.addOption(OptionBuilder.hasArg(false).withArgName(OPTION_SERVICE_DEFAULT_KEYTAB).withLongOpt(OPTION_SERVICE_DEFAULT_KEYTAB)
-        .withDescription("try to set default settings for Service AM keytab; mostly for dev testing").create());
-
-    options.addOption(OptionBuilder.hasArg().withArgName(OPTION_SERVICE_KEYTAB_DIR).withLongOpt(OPTION_SERVICE_KEYTAB_DIR)
-        .withDescription("Service AM keytab directory on HDFS (where the headless user keytab is stored by Service keytab installation, e.g. .yarn/keytabs/llap)").create());
-
-    options.addOption(OptionBuilder.hasArg().withArgName(OPTION_SERVICE_KEYTAB).withLongOpt(OPTION_SERVICE_KEYTAB)
-        .withDescription("Service AM keytab file name inside " + OPTION_SERVICE_KEYTAB_DIR).create());
-
-    options.addOption(OptionBuilder.hasArg().withArgName(OPTION_SERVICE_PRINCIPAL).withLongOpt(OPTION_SERVICE_PRINCIPAL)
-        .withDescription("Service AM principal; should be the user running the cluster, e.g. hive@EXAMPLE.COM").create());
-
-    options.addOption(OptionBuilder.hasArg().withArgName(OPTION_SERVICE_PLACEMENT).withLongOpt(OPTION_SERVICE_PLACEMENT)
-        .withDescription("Service placement policy; see YARN documentation at https://issues.apache.org/jira/browse/YARN-1042."
-          + " This is unnecessary if LLAP is going to take more than half of the YARN capacity of a node.").create());
-
-    options.addOption(OptionBuilder.hasArg().withArgName(OPTION_HEALTH_PERCENT).withLongOpt(OPTION_HEALTH_PERCENT)
-      .withDescription("Percentage of running containers after which LLAP application is considered healthy" +
-        " (Default: 80)").create());
-
-    options.addOption(OptionBuilder.hasArg().withArgName(OPTION_HEALTH_INIT_DELAY_SECS)
-      .withLongOpt(OPTION_HEALTH_INIT_DELAY_SECS)
-      .withDescription("Delay in seconds after which health percentage is monitored (Default: 400)").create());
-
-    options.addOption(OptionBuilder.hasArg().withArgName(OPTION_HEALTH_TIME_WINDOW_SECS)
-      .withLongOpt(OPTION_HEALTH_TIME_WINDOW_SECS)
-      .withDescription("Time window in seconds (after initial delay) for which LLAP application is allowed to be in " +
-        "unhealthy state before being killed (Default: 300)").create());
-
-    options.addOption(OptionBuilder.hasArg().withArgName(OPTION_EXECUTORS).withLongOpt(OPTION_EXECUTORS)
-        .withDescription("executor per instance").create('e'));
-
-    options.addOption(OptionBuilder.hasArg().withArgName(OPTION_CACHE).withLongOpt(OPTION_CACHE)
-        .withDescription("cache size per instance").create('c'));
-
-    options.addOption(OptionBuilder.hasArg().withArgName(OPTION_SIZE).withLongOpt(OPTION_SIZE)
-        .withDescription("container size per instance").create('s'));
-
-    options.addOption(OptionBuilder.hasArg().withArgName(OPTION_XMX).withLongOpt(OPTION_XMX)
-        .withDescription("working memory size").create('w'));
-
-    options.addOption(OptionBuilder.hasArg().withArgName(OPTION_LLAP_QUEUE)
-        .withLongOpt(OPTION_LLAP_QUEUE)
-        .withDescription("The queue within which LLAP will be started").create('q'));
-
-    options.addOption(OptionBuilder.hasArg().withArgName(OPTION_OUTPUT_DIR)
-        .withLongOpt(OPTION_OUTPUT_DIR)
-        .withDescription("Output directory for the generated scripts").create());
-
-    options.addOption(OptionBuilder.hasArg().withArgName(OPTION_AUXJARS).withLongOpt(OPTION_AUXJARS)
-        .withDescription("additional jars to package (by default, JSON SerDe jar is packaged"
-            + " if available)").create('j'));
-
-    options.addOption(OptionBuilder.hasArg().withArgName(OPTION_AUXHBASE).withLongOpt(OPTION_AUXHBASE)
-        .withDescription("whether to package the HBase jars (true by default)").create('h'));
-
-    options.addOption(OptionBuilder.hasArg().withArgName(OPTION_AUXHIVE).withLongOpt(OPTION_AUXHIVE)
-        .withDescription("whether to package the Hive aux jars (true by default)").create(OPTION_AUXHIVE));
-
-    options.addOption(OptionBuilder.hasArg().withArgName(OPTION_JAVA_HOME).withLongOpt(OPTION_JAVA_HOME)
-        .withDescription(
-            "Path to the JRE/JDK. This should be installed at the same location on all cluster nodes ($JAVA_HOME, java.home by default)")
-        .create());
-
-    // -hiveconf x=y
-    options.addOption(OptionBuilder.withValueSeparator().hasArgs(2).withArgName("property=value")
-        .withLongOpt(OPTION_HIVECONF)
-        .withDescription("Use value for given property. Overridden by explicit parameters")
-        .create());
-
-    options.addOption(OptionBuilder.hasArg().withArgName("b")
-        .withLongOpt(OPTION_SERVICE_AM_CONTAINER_MB)
-        .withDescription("The size of the service AppMaster container in MB").create('b'));
-
-    options.addOption(OptionBuilder.withValueSeparator().hasArgs(2).withArgName("property=value")
-        .withLongOpt(OPTION_SERVICE_APPCONFIG_GLOBAL)
-        .withDescription("Property (key=value) to be set in the global section of the Service appConfig")
-        .create());
-
-    options.addOption(OptionBuilder.hasArg().withArgName(OPTION_IO_THREADS)
-        .withLongOpt(OPTION_IO_THREADS).withDescription("executor per instance").create('t'));
-
-    options.addOption(OptionBuilder.hasArg(false).withArgName(OPTION_START)
-        .withLongOpt(OPTION_START).withDescription("immediately start the cluster")
-        .create('z'));
-
-    // [-H|--help]
-    options.addOption(new Option("H", "help", false, "Print help information"));
-  }
-
-  private static long parseSuffixed(String value) {
-    return StringUtils.TraditionalBinaryPrefix.string2long(value);
-  }
-
-  public LlapOptions processOptions(String argv[]) throws ParseException, IOException {
-    commandLine = new GnuParser().parse(options, argv);
-    if (commandLine.hasOption('H') || false == commandLine.hasOption("instances")) {
-      // needs at least --instances
-      printUsage();
-      return null;
-    }
-
-    int instances = Integer.parseInt(commandLine.getOptionValue(OPTION_INSTANCES));
-    String directory = commandLine.getOptionValue(OPTION_DIRECTORY);
-    String jars = commandLine.getOptionValue(OPTION_AUXJARS);
-
-    String name = commandLine.getOptionValue(OPTION_NAME, null);
-
-    final int executors = Integer.parseInt(commandLine.getOptionValue(OPTION_EXECUTORS, "-1"));
-    final int ioThreads = Integer.parseInt(
-        commandLine.getOptionValue(OPTION_IO_THREADS, Integer.toString(executors)));
-    final long cache = parseSuffixed(commandLine.getOptionValue(OPTION_CACHE, "-1"));
-    final long size = parseSuffixed(commandLine.getOptionValue(OPTION_SIZE, "-1"));
-    final long xmx = parseSuffixed(commandLine.getOptionValue(OPTION_XMX, "-1"));
-    final boolean isHbase = Boolean.parseBoolean(
-        commandLine.getOptionValue(OPTION_AUXHBASE, "true"));
-    final boolean isHiveAux = Boolean.parseBoolean(
-        commandLine.getOptionValue(OPTION_AUXHIVE, "true"));
-    final boolean doStart = commandLine.hasOption(OPTION_START);
-    final String output = commandLine.getOptionValue(OPTION_OUTPUT_DIR, null);
-
-    final String queueName = commandLine.getOptionValue(OPTION_LLAP_QUEUE,
-        HiveConf.ConfVars.LLAP_DAEMON_QUEUE_NAME.getDefaultValue());
-
-    final Properties hiveconf;
-
-    if (commandLine.hasOption(OPTION_HIVECONF)) {
-      hiveconf = commandLine.getOptionProperties(OPTION_HIVECONF);
-    } else {
-      hiveconf = new Properties();
-    }
-
-    String javaHome = null;
-    if (commandLine.hasOption(OPTION_JAVA_HOME)) {
-      javaHome = commandLine.getOptionValue(OPTION_JAVA_HOME);
-    }
-
-    String logger = null;
-    if (commandLine.hasOption(OPTION_LOGGER)) {
-      logger = commandLine.getOptionValue(OPTION_LOGGER);
-      Preconditions.checkArgument(
-          logger.equalsIgnoreCase(LogHelpers.LLAP_LOGGER_NAME_QUERY_ROUTING) ||
-              logger.equalsIgnoreCase(LogHelpers.LLAP_LOGGER_NAME_CONSOLE) ||
-              logger.equalsIgnoreCase(LogHelpers.LLAP_LOGGER_NAME_RFA));
-    }
-
-    // loglevel & args are parsed by the python processor
-
-    return new LlapOptions(name, instances, directory, executors, ioThreads, cache, size, xmx,
-        jars, isHbase, hiveconf, javaHome, queueName, logger, doStart, output, isHiveAux);
-  }
-
-  private void printUsage() {
-    HelpFormatter hf = new HelpFormatter();
-    try {
-      int width = hf.getWidth();
-      int jlineWidth = TerminalFactory.get().getWidth();
-      width = Math.min(160, Math.max(jlineWidth, width)); // Ignore potentially incorrect values
-      hf.setWidth(width);
-    } catch (Throwable t) { // Ignore
-    }
-    hf.printHelp("llap", options);
-  }
-}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
deleted file mode 100644 (file)
index 4bc2431..0000000
+++ /dev/null
@@ -1,839 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.cli;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hive.common.CompressionUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.LlapUtil;
-import org.apache.hadoop.hive.llap.cli.LlapOptionsProcessor.LlapOptions;
-import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
-import org.apache.hadoop.hive.llap.daemon.impl.LlapConstants;
-import org.apache.hadoop.hive.llap.daemon.impl.StaticPermanentFunctionChecker;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
-import org.apache.hadoop.hive.llap.io.api.impl.LlapInputFormat;
-import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
-import org.apache.hadoop.hive.metastore.api.Function;
-import org.apache.hadoop.hive.metastore.api.ResourceUri;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.io.HiveInputFormat;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.util.ResourceDownloader;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.registry.client.binding.RegistryUtils;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-import org.eclipse.jetty.rewrite.handler.Rule;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-public class LlapServiceDriver {
-  protected static final Logger LOG = LoggerFactory.getLogger(LlapServiceDriver.class.getName());
-
-  private static final String[]
-      DEFAULT_AUX_CLASSES =
-      new String[] { "org.apache.hive.hcatalog.data.JsonSerDe", "org.apache.hadoop.hive.druid.DruidStorageHandler",
-          "org.apache.hive.storage.jdbc.JdbcStorageHandler", "org.apache.commons.dbcp.BasicDataSourceFactory",
-          "org.apache.commons.pool.impl.GenericObjectPool", "org.apache.hadoop.hive.kafka.KafkaStorageHandler" };
-  private static final String HBASE_SERDE_CLASS = "org.apache.hadoop.hive.hbase.HBaseSerDe";
-  private static final String[] NEEDED_CONFIGS = LlapDaemonConfiguration.DAEMON_CONFIGS;
-  private static final String[] OPTIONAL_CONFIGS = LlapDaemonConfiguration.SSL_DAEMON_CONFIGS;
-  private static final String OUTPUT_DIR_PREFIX = "llap-yarn-";
-
-  // This is not a config that users set in hive-site. It's only use is to share information
-  // between the java component of the service driver and the python component.
-  private static final String CONFIG_CLUSTER_NAME = "private.hive.llap.servicedriver.cluster.name";
-
-  /**
-   * This is a working configuration for the instance to merge various variables.
-   * It is not written out for llap server usage
-   */
-  private final HiveConf conf;
-
-  public LlapServiceDriver() {
-    SessionState ss = SessionState.get();
-    conf = (ss != null) ? ss.getConf() : new HiveConf(SessionState.class);
-  }
-
-  public static void main(String[] args) throws Exception {
-    LOG.info("LLAP service driver invoked with arguments={}", args);
-    int ret = 0;
-    try {
-      ret = new LlapServiceDriver().run(args);
-    } catch (Throwable t) {
-      System.err.println("Failed: " + t.getMessage());
-      t.printStackTrace();
-      ret = 3;
-    } finally {
-      LOG.info("LLAP service driver finished");
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Completed processing - exiting with " + ret);
-    }
-    System.exit(ret);
-  }
-
-
-  private static Configuration resolve(Configuration configured, Properties direct,
-                                       Properties hiveconf) {
-    Configuration conf = new Configuration(false);
-
-    populateConf(configured, conf, hiveconf, "CLI hiveconf");
-    populateConf(configured, conf, direct, "CLI direct");
-
-    return conf;
-  }
-
-  private static void populateConf(Configuration configured, Configuration target,
-                                   Properties properties, String source) {
-    for (Entry<Object, Object> entry : properties.entrySet()) {
-      String key = (String) entry.getKey();
-      String val = configured.get(key);
-      if (val != null) {
-        target.set(key, val, source);
-      }
-    }
-  }
-
-  static void populateConfWithLlapProperties(Configuration conf, Properties properties) {
-    for(Entry<Object, Object> props : properties.entrySet()) {
-      String key = (String) props.getKey();
-      if (HiveConf.getLlapDaemonConfVars().contains(key)) {
-        conf.set(key, (String) props.getValue());
-      } else {
-        if (key.startsWith(HiveConf.PREFIX_LLAP) || key.startsWith(HiveConf.PREFIX_HIVE_LLAP)) {
-          LOG.warn("Adding key [{}] even though it is not in the set of known llap-server keys", key);
-          conf.set(key, (String) props.getValue());
-        } else {
-          LOG.warn("Ignoring unknown llap server parameter: [{}]", key);
-        }
-      }
-    }
-  }
-
-  private static abstract class NamedCallable<T> implements Callable<T> {
-    public final String taskName;
-    public NamedCallable (String name) {
-      this.taskName = name;
-    }
-    public String getName() {
-      return taskName;
-    }
-  }
-
-  private int run(String[] args) throws Exception {
-    LlapOptionsProcessor optionsProcessor = new LlapOptionsProcessor();
-    final LlapOptions options = optionsProcessor.processOptions(args);
-
-    final Properties propsDirectOptions = new Properties();
-
-    if (options == null) {
-      // help
-      return 1;
-    }
-
-    // Working directory.
-    Path tmpDir = new Path(options.getDirectory());
-
-    if (conf == null) {
-      throw new Exception("Cannot load any configuration to run command");
-    }
-
-    final long t0 = System.nanoTime();
-
-    final FileSystem fs = FileSystem.get(conf);
-    final FileSystem lfs = FileSystem.getLocal(conf).getRawFileSystem();
-
-    int threadCount = Math.max(1, Runtime.getRuntime().availableProcessors() / 2);
-    final ExecutorService executor = Executors.newFixedThreadPool(threadCount,
-            new ThreadFactoryBuilder().setNameFormat("llap-pkg-%d").build());
-    final CompletionService<Void> asyncRunner = new ExecutorCompletionService<Void>(executor);
-
-    int rc = 0;
-    try {
-
-      // needed so that the file is actually loaded into configuration.
-      for (String f : NEEDED_CONFIGS) {
-        conf.addResource(f);
-        if (conf.getResource(f) == null) {
-          throw new Exception("Unable to find required config file: " + f);
-        }
-      }
-      for (String f : OPTIONAL_CONFIGS) {
-        conf.addResource(f);
-      }
-
-      conf.reloadConfiguration();
-
-      populateConfWithLlapProperties(conf, options.getConfig());
-
-      if (options.getName() != null) {
-        // update service registry configs - caveat: this has nothing to do with the actual settings
-        // as read by the AM
-        // if needed, use --hiveconf llap.daemon.service.hosts=@llap0 to dynamically switch between
-        // instances
-        conf.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + options.getName());
-        propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname,
-            "@" + options.getName());
-      }
-
-      if (options.getLogger() != null) {
-        HiveConf.setVar(conf, ConfVars.LLAP_DAEMON_LOGGER, options.getLogger());
-        propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_LOGGER.varname, options.getLogger());
-      }
-      boolean isDirect = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT);
-
-      if (options.getSize() != -1) {
-        if (options.getCache() != -1) {
-          if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED) == false) {
-            // direct heap allocations need to be safer
-            Preconditions.checkArgument(options.getCache() < options.getSize(), "Cache size ("
-                + LlapUtil.humanReadableByteCount(options.getCache()) + ") has to be smaller"
-                + " than the container sizing (" + LlapUtil.humanReadableByteCount(options.getSize()) + ")");
-          } else if (options.getCache() < options.getSize()) {
-            LOG.warn("Note that this might need YARN physical memory monitoring to be turned off "
-                + "(yarn.nodemanager.pmem-check-enabled=false)");
-          }
-        }
-        if (options.getXmx() != -1) {
-          Preconditions.checkArgument(options.getXmx() < options.getSize(), "Working memory (Xmx="
-              + LlapUtil.humanReadableByteCount(options.getXmx()) + ") has to be"
-              + " smaller than the container sizing (" + LlapUtil.humanReadableByteCount(options.getSize())
-              + ")");
-        }
-        if (isDirect && !HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED)) {
-          // direct and not memory mapped
-          Preconditions.checkArgument(options.getXmx() + options.getCache() <= options.getSize(),
-            "Working memory (Xmx=" + LlapUtil.humanReadableByteCount(options.getXmx()) + ") + cache size ("
-              + LlapUtil.humanReadableByteCount(options.getCache()) + ") has to be smaller than the container sizing ("
-              + LlapUtil.humanReadableByteCount(options.getSize()) + ")");
-        }
-      }
-
-
-      if (options.getExecutors() != -1) {
-        conf.setLong(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, options.getExecutors());
-        propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname,
-            String.valueOf(options.getExecutors()));
-        // TODO: vcpu settings - possibly when DRFA works right
-      }
-
-      if (options.getIoThreads() != -1) {
-        conf.setLong(ConfVars.LLAP_IO_THREADPOOL_SIZE.varname, options.getIoThreads());
-        propsDirectOptions.setProperty(ConfVars.LLAP_IO_THREADPOOL_SIZE.varname,
-            String.valueOf(options.getIoThreads()));
-      }
-
-      long cache = -1, xmx = -1;
-      if (options.getCache() != -1) {
-        cache = options.getCache();
-        conf.set(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, Long.toString(cache));
-        propsDirectOptions.setProperty(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname,
-            Long.toString(cache));
-      }
-
-      if (options.getXmx() != -1) {
-        // Needs more explanation here
-        // Xmx is not the max heap value in JDK8. You need to subtract 50% of the survivor fraction
-        // from this, to get actual usable memory before it goes into GC
-        xmx = options.getXmx();
-        long xmxMb = (xmx / (1024L * 1024L));
-        conf.setLong(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, xmxMb);
-        propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname,
-            String.valueOf(xmxMb));
-      }
-
-      long size = options.getSize();
-      if (size == -1) {
-        long heapSize = xmx;
-        if (!isDirect) {
-          heapSize += cache;
-        }
-        size = Math.min((long)(heapSize * 1.2), heapSize + 1024L*1024*1024);
-        if (isDirect) {
-          size += cache;
-        }
-      }
-      long containerSize = size / (1024 * 1024);
-      final long minAlloc = conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1);
-      Preconditions.checkArgument(containerSize >= minAlloc, "Container size ("
-          + LlapUtil.humanReadableByteCount(options.getSize()) + ") should be greater"
-          + " than minimum allocation(" + LlapUtil.humanReadableByteCount(minAlloc * 1024L * 1024L) + ")");
-      conf.setLong(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize);
-      propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname,
-          String.valueOf(containerSize));
-
-      LOG.info("Memory settings: container memory: {} executor memory: {} cache memory: {}",
-        LlapUtil.humanReadableByteCount(options.getSize()),
-        LlapUtil.humanReadableByteCount(options.getXmx()),
-        LlapUtil.humanReadableByteCount(options.getCache()));
-
-      if (options.getLlapQueueName() != null && !options.getLlapQueueName().isEmpty()) {
-        conf.set(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, options.getLlapQueueName());
-        propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname,
-            options.getLlapQueueName());
-      }
-
-      final URL logger = conf.getResource(LlapConstants.LOG4j2_PROPERTIES_FILE);
-
-      if (null == logger) {
-        throw new Exception("Unable to find required config file: llap-daemon-log4j2.properties");
-      }
-
-      Path home = new Path(System.getenv("HIVE_HOME"));
-      Path scriptParent = new Path(new Path(home, "scripts"), "llap");
-      Path scripts = new Path(scriptParent, "bin");
-
-      if (!lfs.exists(home)) {
-        throw new Exception("Unable to find HIVE_HOME:" + home);
-      } else if (!lfs.exists(scripts)) {
-        LOG.warn("Unable to find llap scripts:" + scripts);
-      }
-
-      final Path libDir = new Path(tmpDir, "lib");
-      final Path tezDir = new Path(libDir, "tez");
-      final Path udfDir = new Path(libDir, "udfs");
-      final Path confPath = new Path(tmpDir, "conf");
-      if (!lfs.mkdirs(confPath)) {
-        LOG.warn("mkdirs for " + confPath + " returned false");
-      }
-      if (!lfs.mkdirs(tezDir)) {
-        LOG.warn("mkdirs for " + tezDir + " returned false");
-      }
-      if (!lfs.mkdirs(udfDir)) {
-        LOG.warn("mkdirs for " + udfDir + " returned false");
-      }
-
-      NamedCallable<Void> downloadTez = new NamedCallable<Void>("downloadTez") {
-        @Override
-        public Void call() throws Exception {
-          synchronized (fs) {
-            String tezLibs = conf.get(TezConfiguration.TEZ_LIB_URIS);
-            if (tezLibs == null) {
-              LOG.warn("Missing tez.lib.uris in tez-site.xml");
-            }
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Copying tez libs from " + tezLibs);
-            }
-            lfs.mkdirs(tezDir);
-            fs.copyToLocalFile(new Path(tezLibs), new Path(libDir, "tez.tar.gz"));
-            CompressionUtils.unTar(new Path(libDir, "tez.tar.gz").toString(), tezDir.toString(),
-                true);
-            lfs.delete(new Path(libDir, "tez.tar.gz"), false);
-          }
-          return null;
-        }
-      };
-
-      NamedCallable<Void> copyLocalJars = new NamedCallable<Void>("copyLocalJars") {
-        @Override
-        public Void call() throws Exception {
-          Class<?>[] dependencies = new Class<?>[] { LlapDaemonProtocolProtos.class, // llap-common
-              LlapTezUtils.class, // llap-tez
-              LlapInputFormat.class, // llap-server
-              HiveInputFormat.class, // hive-exec
-              SslContextFactory.class, // hive-common (https deps)
-              Rule.class, // Jetty rewrite class
-              RegistryUtils.ServiceRecordMarshal.class, // ZK registry
-              // log4j2
-              com.lmax.disruptor.RingBuffer.class, // disruptor
-              org.apache.logging.log4j.Logger.class, // log4j-api
-              org.apache.logging.log4j.core.Appender.class, // log4j-core
-              org.apache.logging.slf4j.Log4jLogger.class, // log4j-slf4j
-              // log4j-1.2-API needed for NDC
-              org.apache.log4j.config.Log4j1ConfigurationFactory.class,
-              io.netty.util.NetUtil.class, // netty4
-              org.jboss.netty.util.NetUtil.class, //netty3
-              org.apache.arrow.vector.types.pojo.ArrowType.class, //arrow-vector
-              org.apache.arrow.memory.BaseAllocator.class, //arrow-memory
-              org.apache.arrow.flatbuf.Schema.class, //arrow-format
-              com.google.flatbuffers.Table.class, //flatbuffers
-              com.carrotsearch.hppc.ByteArrayDeque.class //hppc
-              };
-
-          for (Class<?> c : dependencies) {
-            Path jarPath = new Path(Utilities.jarFinderGetJar(c));
-            lfs.copyFromLocalFile(jarPath, libDir);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Copying " + jarPath + " to " + libDir);
-            }
-          }
-          return null;
-        }
-      };
-
-      // copy default aux classes (json/hbase)
-
-      NamedCallable<Void> copyAuxJars = new NamedCallable<Void>("copyAuxJars") {
-        @Override
-        public Void call() throws Exception {
-          for (String className : DEFAULT_AUX_CLASSES) {
-            localizeJarForClass(lfs, libDir, className, false);
-          }
-          Collection<String> codecs = conf.getStringCollection("io.compression.codecs");
-          if (codecs != null) {
-            for (String codecClassName : codecs) {
-              localizeJarForClass(lfs, libDir, codecClassName, false);
-            }
-          }
-          for (String className : getDbSpecificJdbcJars()) {
-            localizeJarForClass(lfs, libDir, className, false);
-          }
-          if (options.getIsHBase()) {
-            try {
-              localizeJarForClass(lfs, libDir, HBASE_SERDE_CLASS, true);
-              Job fakeJob = new Job(new JobConf()); // HBase API is convoluted.
-              TableMapReduceUtil.addDependencyJars(fakeJob);
-              Collection<String> hbaseJars =
-                  fakeJob.getConfiguration().getStringCollection("tmpjars");
-              for (String jarPath : hbaseJars) {
-                if (!jarPath.isEmpty()) {
-                  lfs.copyFromLocalFile(new Path(jarPath), libDir);
-                }
-              }
-            } catch (Throwable t) {
-              String err =
-                  "Failed to add HBase jars. Use --auxhbase=false to avoid localizing them";
-              LOG.error(err);
-              System.err.println(err);
-              throw new RuntimeException(t);
-            }
-          }
-
-          HashSet<String> auxJars = new HashSet<>();
-          // There are many ways to have AUX jars in Hive... sigh
-          if (options.getIsHiveAux()) {
-            // Note: we don't add ADDED jars, RELOADABLE jars, etc. That is by design; there are too many ways
-            // to add jars in Hive, some of which are session/etc. specific. Env + conf + arg should be enough.
-            addAuxJarsToSet(auxJars, conf.getAuxJars(), ",");
-            addAuxJarsToSet(auxJars, System.getenv("HIVE_AUX_JARS_PATH"), ":");
-            LOG.info("Adding the following aux jars from the environment and configs: " + auxJars);
-          }
-
-          addAuxJarsToSet(auxJars, options.getAuxJars(), ",");
-          for (String jarPath : auxJars) {
-            lfs.copyFromLocalFile(new Path(jarPath), libDir);
-          }
-          return null;
-        }
-
-        private void addAuxJarsToSet(HashSet<String> auxJarSet, String auxJars, String delimiter) {
-          if (auxJars != null && !auxJars.isEmpty()) {
-            // TODO: transitive dependencies warning?
-            String[] jarPaths = auxJars.split(delimiter);
-            for (String jarPath : jarPaths) {
-              if (!jarPath.isEmpty()) {
-                auxJarSet.add(jarPath);
-              }
-            }
-          }
-        }
-      };
-
-      NamedCallable<Void> copyUdfJars = new NamedCallable<Void>("copyUdfJars") {
-        @Override
-        public Void call() throws Exception {
-          // UDFs
-          final Set<String> allowedUdfs;
-
-          if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOW_PERMANENT_FNS)) {
-            synchronized (fs) {
-              allowedUdfs = downloadPermanentFunctions(conf, udfDir);
-            }
-          } else {
-            allowedUdfs = Collections.emptySet();
-          }
-
-          PrintWriter udfStream =
-              new PrintWriter(lfs.create(new Path(confPath,
-                  StaticPermanentFunctionChecker.PERMANENT_FUNCTIONS_LIST)));
-          for (String udfClass : allowedUdfs) {
-            udfStream.println(udfClass);
-          }
-
-          udfStream.close();
-          return null;
-        }
-      };
-
-      String java_home;
-      if (options.getJavaPath() == null || options.getJavaPath().isEmpty()) {
-        java_home = System.getenv("JAVA_HOME");
-        String jre_home = System.getProperty("java.home");
-        if (java_home == null) {
-          java_home = jre_home;
-        } else if (!java_home.equals(jre_home)) {
-          LOG.warn("Java versions might not match : JAVA_HOME=[{}],process jre=[{}]", java_home,
-              jre_home);
-        }
-      } else {
-        java_home = options.getJavaPath();
-      }
-      if (java_home == null || java_home.isEmpty()) {
-        throw new RuntimeException(
-            "Could not determine JAVA_HOME from command line parameters, environment or system properties");
-      }
-      LOG.info("Using [{}] for JAVA_HOME", java_home);
-
-      NamedCallable<Void> copyConfigs = new NamedCallable<Void>("copyConfigs") {
-        @Override
-        public Void call() throws Exception {
-          // Copy over the mandatory configs for the package.
-          for (String f : NEEDED_CONFIGS) {
-            copyConfig(lfs, confPath, f);
-          }
-          for (String f : OPTIONAL_CONFIGS) {
-            try {
-              copyConfig(lfs, confPath, f);
-            } catch (Throwable t) {
-              LOG.info("Error getting an optional config " + f + "; ignoring: " + t.getMessage());
-            }
-          }
-          createLlapDaemonConfig(lfs, confPath, conf, propsDirectOptions, options.getConfig());
-          setUpLogAndMetricConfigs(lfs, logger, confPath);
-          return null;
-        }
-      };
-
-      @SuppressWarnings("unchecked")
-      final NamedCallable<Void>[] asyncWork =
-          new NamedCallable[] {
-          downloadTez,
-          copyUdfJars,
-          copyLocalJars,
-          copyAuxJars,
-          copyConfigs };
-      @SuppressWarnings("unchecked")
-      final Future<Void>[] asyncResults = new Future[asyncWork.length];
-      for (int i = 0; i < asyncWork.length; i++) {
-        asyncResults[i] = asyncRunner.submit(asyncWork[i]);
-      }
-
-      // TODO: need to move from Python to Java for the rest of the script.
-      JSONObject configs = createConfigJson(containerSize, cache, xmx, java_home);
-      writeConfigJson(tmpDir, lfs, configs);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Config generation took " + (System.nanoTime() - t0) + " ns");
-      }
-      for (int i = 0; i < asyncWork.length; i++) {
-        final long t1 = System.nanoTime();
-        asyncResults[i].get();
-        final long t2 = System.nanoTime();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(asyncWork[i].getName() + " waited for " + (t2 - t1) + " ns");
-        }
-      }
-      if (options.isStarting()) {
-        String version = System.getenv("HIVE_VERSION");
-        if (version == null || version.isEmpty()) {
-          version = DateTime.now().toString("ddMMMyyyy");
-        }
-
-        String outputDir = options.getOutput();
-        Path packageDir = null;
-        if (outputDir == null) {
-          outputDir = OUTPUT_DIR_PREFIX + version;
-          packageDir = new Path(Paths.get(".").toAbsolutePath().toString(),
-              OUTPUT_DIR_PREFIX + version);
-        } else {
-          packageDir = new Path(outputDir);
-        }
-        rc = runPackagePy(args, tmpDir, scriptParent, version, outputDir);
-        if (rc == 0) {
-          LlapSliderUtils.startCluster(conf, options.getName(),
-              "llap-" + version + ".tar.gz", packageDir,
-              HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_QUEUE_NAME));
-        }
-      } else {
-        rc = 0;
-      }
-    } finally {
-      executor.shutdown();
-      lfs.close();
-      fs.close();
-    }
-
-    if (rc == 0) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Exiting successfully");
-      }
-    } else {
-      LOG.info("Exiting with rc = " + rc);
-    }
-    return rc;
-  }
-
-  private int runPackagePy(String[] args, Path tmpDir, Path scriptParent,
-      String version, String outputDir) throws IOException, InterruptedException {
-    Path scriptPath = new Path(new Path(scriptParent, "yarn"), "package.py");
-    List<String> scriptArgs = new ArrayList<>(args.length + 7);
-    scriptArgs.add("python");
-    scriptArgs.add(scriptPath.toString());
-    scriptArgs.add("--input");
-    scriptArgs.add(tmpDir.toString());
-    scriptArgs.add("--output");
-    scriptArgs.add(outputDir);
-    scriptArgs.add("--javaChild");
-    for (String arg : args) {
-      scriptArgs.add(arg);
-    }
-    LOG.debug("Calling package.py via: " + scriptArgs);
-    ProcessBuilder builder = new ProcessBuilder(scriptArgs);
-    builder.redirectError(ProcessBuilder.Redirect.INHERIT);
-    builder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
-    builder.environment().put("HIVE_VERSION", version);
-    return builder.start().waitFor();
-  }
-
-  private void writeConfigJson(Path tmpDir, final FileSystem lfs,
-      JSONObject configs) throws IOException, JSONException {
-    FSDataOutputStream os = lfs.create(new Path(tmpDir, "config.json"));
-    OutputStreamWriter w = new OutputStreamWriter(os);
-    configs.write(w);
-    w.close();
-    os.close();
-  }
-
-  private JSONObject createConfigJson(long containerSize, long cache, long xmx,
-                                      String java_home) throws JSONException {
-    // extract configs for processing by the python fragments in YARN Service
-    JSONObject configs = new JSONObject();
-
-    configs.put("java.home", java_home);
-
-    configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname,
-        HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB));
-    configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize);
-
-    configs.put(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname,
-        HiveConf.getSizeVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE));
-
-    configs.put(HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT.varname,
-        HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT));
-
-    configs.put(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname,
-        HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB));
-
-    configs.put(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE.varname,
-        HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE));
-
-    configs.put(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname,
-        HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS));
-
-    // Let YARN pick the queue name, if it isn't provided in hive-site, or via the command-line
-    if (HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_QUEUE_NAME) != null) {
-      configs.put(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname,
-          HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_QUEUE_NAME));
-    }
-
-    // Propagate the cluster name to the script.
-    String clusterHosts = HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
-    if (!StringUtils.isEmpty(clusterHosts) && clusterHosts.startsWith("@")
-        && clusterHosts.length() > 1) {
-      configs.put(CONFIG_CLUSTER_NAME, clusterHosts.substring(1));
-    }
-
-    configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
-        conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1));
-
-    configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
-        conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, -1));
-
-    long maxDirect = (xmx > 0 && cache > 0 && xmx < cache * 1.25) ? (long) (cache * 1.25) : -1;
-    configs.put("max_direct_memory", Long.toString(maxDirect));
-    return configs;
-  }
-
-  private Set<String> downloadPermanentFunctions(Configuration conf, Path udfDir) throws HiveException,
-      URISyntaxException, IOException {
-    Map<String,String> udfs = new HashMap<String, String>();
-    HiveConf hiveConf = new HiveConf();
-    // disable expensive operations on the metastore
-    hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_INIT_METADATA_COUNT_ENABLED, false);
-    hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_METRICS, false);
-    // performance problem: ObjectStore does its own new HiveConf()
-    Hive hive = Hive.getWithFastCheck(hiveConf, false);
-    ResourceDownloader resourceDownloader =
-        new ResourceDownloader(conf, udfDir.toUri().normalize().getPath());
-    List<Function> fns = hive.getAllFunctions();
-    Set<URI> srcUris = new HashSet<>();
-    for (Function fn : fns) {
-      String fqfn = fn.getDbName() + "." + fn.getFunctionName();
-      if (udfs.containsKey(fn.getClassName())) {
-        LOG.warn("Duplicate function names found for " + fn.getClassName() + " with " + fqfn
-            + " and " + udfs.get(fn.getClassName()));
-      }
-      udfs.put(fn.getClassName(), fqfn);
-      List<ResourceUri> resources = fn.getResourceUris();
-      if (resources == null || resources.isEmpty()) {
-        LOG.warn("Missing resources for " + fqfn);
-        continue;
-      }
-      for (ResourceUri resource : resources) {
-        srcUris.add(ResourceDownloader.createURI(resource.getUri()));
-      }
-    }
-    for (URI srcUri : srcUris) {
-      List<URI> localUris = resourceDownloader.downloadExternal(srcUri, null, false);
-      for(URI dst : localUris) {
-        LOG.warn("Downloaded " + dst + " from " + srcUri);
-      }
-    }
-    return udfs.keySet();
-  }
-
-  private void addJarForClassToListIfExists(String cls, List<String> jarList) {
-    try {
-      Class.forName(cls);
-      jarList.add(cls);
-    } catch (Exception e) {
-    }
-  }
-  private List<String> getDbSpecificJdbcJars() {
-    List<String> jdbcJars = new ArrayList<String>();
-    addJarForClassToListIfExists("com.mysql.jdbc.Driver", jdbcJars); // add mysql jdbc driver
-    addJarForClassToListIfExists("org.postgresql.Driver", jdbcJars); // add postgresql jdbc driver
-    addJarForClassToListIfExists("oracle.jdbc.OracleDriver", jdbcJars); // add oracle jdbc driver
-    addJarForClassToListIfExists("com.microsoft.sqlserver.jdbc.SQLServerDriver", jdbcJars); // add mssql jdbc driver
-    return jdbcJars;
-  }
-
-  private void localizeJarForClass(FileSystem lfs, Path libDir, String className, boolean doThrow)
-      throws IOException {
-    String jarPath = null;
-    boolean hasException = false;
-    try {
-      Class<?> auxClass = Class.forName(className);
-      jarPath = Utilities.jarFinderGetJar(auxClass);
-    } catch (Throwable t) {
-      if (doThrow) {
-        throw (t instanceof IOException) ? (IOException)t : new IOException(t);
-      }
-      hasException = true;
-      String err = "Cannot find a jar for [" + className + "] due to an exception ("
-          + t.getMessage() + "); not packaging the jar";
-      LOG.error(err);
-      System.err.println(err);
-    }
-    if (jarPath != null) {
-      lfs.copyFromLocalFile(new Path(jarPath), libDir);
-    } else if (!hasException) {
-      String err = "Cannot find a jar for [" + className + "]; not packaging the jar";
-      if (doThrow) {
-        throw new IOException(err);
-      }
-      LOG.error(err);
-      System.err.println(err);
-    }
-  }
-
-  /**
-   *
-   * @param lfs filesystem on which file will be generated
-   * @param confPath path wher the config will be generated
-   * @param configured the base configuration instances
-   * @param direct properties specified directly - i.e. using the properties exact option
-   * @param hiveconf properties specifried via --hiveconf
-   * @throws IOException
-   */
-  private void createLlapDaemonConfig(FileSystem lfs, Path confPath, Configuration configured,
-                                      Properties direct, Properties hiveconf) throws IOException {
-    FSDataOutputStream confStream =
-        lfs.create(new Path(confPath, LlapDaemonConfiguration.LLAP_DAEMON_SITE));
-
-    Configuration llapDaemonConf = resolve(configured, direct, hiveconf);
-
-    llapDaemonConf.writeXml(confStream);
-    confStream.close();
-  }
-
-  private void copyConfig(FileSystem lfs, Path confPath, String f) throws IOException {
-    HiveConf.getBoolVar(new Configuration(false), ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS);
-    // they will be file:// URLs
-    lfs.copyFromLocalFile(new Path(conf.getResource(f).toString()), confPath);
-  }
-
-  private void setUpLogAndMetricConfigs(final FileSystem lfs, final URL logger,
-      final Path confPath) throws IOException {
-    // logger can be a resource stream or a real file (cannot use copy)
-    InputStream loggerContent = logger.openStream();
-    IOUtils.copyBytes(loggerContent,
-        lfs.create(new Path(confPath, "llap-daemon-log4j2.properties"), true), conf, true);
-
-    String metricsFile = LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE;
-    URL metrics2 = conf.getResource(metricsFile);
-    if (metrics2 == null) {
-      LOG.warn(LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE + " cannot be found."
-          + " Looking for " + LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE);
-      metricsFile = LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE;
-      metrics2 = conf.getResource(metricsFile);
-    }
-    if (metrics2 != null) {
-      InputStream metrics2FileStream = metrics2.openStream();
-      IOUtils.copyBytes(metrics2FileStream,
-          lfs.create(new Path(confPath, metricsFile), true), conf, true);
-      LOG.info("Copied hadoop metrics2 properties file from " + metrics2);
-    } else {
-      LOG.warn("Cannot find " + LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE + " or "
-          + LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE + " in classpath.");
-    }
-  }
-}
index bdec1c1..a9a216c 100644 (file)
 
 package org.apache.hadoop.hive.llap.cli;
 
-import java.io.File;
-import java.io.IOException;
-
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.client.ServiceClient;
-import org.apache.hadoop.yarn.service.utils.CoreFileSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class LlapSliderUtils {
-  private static final Logger LOG = LoggerFactory.getLogger(LlapSliderUtils.class);
-  private static final String LLAP_PACKAGE_DIR = ".yarn/package/LLAP/";
-
   public static ServiceClient createServiceClient(Configuration conf) throws Exception {
     ServiceClient serviceClient = new ServiceClient();
     serviceClient.init(conf);
     serviceClient.start();
     return serviceClient;
   }
-
-  public static Service getService(Configuration conf, String name) {
-    LOG.info("Get service details for " + name);
-    ServiceClient sc;
-    try {
-      sc = createServiceClient(conf);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-    Service service = null;
-    try {
-      service = sc.getStatus(name);
-    } catch (YarnException | IOException e) {
-      // Probably the app does not exist
-      LOG.info(e.getLocalizedMessage());
-      throw new RuntimeException(e);
-    } finally {
-      try {
-        sc.close();
-      } catch (IOException e) {
-        LOG.info("Failed to close service client", e);
-      }
-    }
-    return service;
-  }
-
-  public static void startCluster(Configuration conf, String name, String packageName, Path packageDir, String queue) {
-    LOG.info("Starting cluster with " + name + ", " + packageName + ", " + queue + ", " + packageDir);
-    ServiceClient sc;
-    try {
-      sc = createServiceClient(conf);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-    try {
-      try {
-        LOG.info("Executing the stop command");
-        sc.actionStop(name, true);
-      } catch (Exception ex) {
-        // Ignore exceptions from stop
-        LOG.info(ex.getLocalizedMessage());
-      }
-      try {
-        LOG.info("Executing the destroy command");
-        sc.actionDestroy(name);
-      } catch (Exception ex) {
-        // Ignore exceptions from destroy
-        LOG.info(ex.getLocalizedMessage());
-      }
-      LOG.info("Uploading the app tarball");
-      CoreFileSystem fs = new CoreFileSystem(conf);
-      fs.createWithPermissions(new Path(LLAP_PACKAGE_DIR),
-          FsPermission.getDirDefault());
-      fs.copyLocalFileToHdfs(new File(packageDir.toString(), packageName),
-          new Path(LLAP_PACKAGE_DIR), new FsPermission("755"));
-
-      LOG.info("Executing the launch command");
-      File yarnfile = new File(new Path(packageDir, "Yarnfile").toString());
-      Long lifetime = null; // unlimited lifetime
-      try {
-        sc.actionLaunch(yarnfile.getAbsolutePath(), name, lifetime, queue);
-      } finally {
-      }
-      LOG.debug("Started the cluster via service API");
-    } catch (YarnException | IOException e) {
-      throw new RuntimeException(e);
-    } finally {
-      try {
-        sc.close();
-      } catch (IOException e) {
-        LOG.info("Failed to close service client", e);
-      }
-    }
-  }
-
 }
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyAuxJars.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyAuxJars.java
new file mode 100644 (file)
index 0000000..7b2e32b
--- /dev/null
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cli.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Copy auxiliary jars for the tarball. */
+class AsyncTaskCopyAuxJars implements Callable<Void> {
+  private static final Logger LOG = LoggerFactory.getLogger(AsyncTaskCopyAuxJars.class.getName());
+
+  private static final String[] DEFAULT_AUX_CLASSES =
+      new String[] {"org.apache.hive.hcatalog.data.JsonSerDe", "org.apache.hadoop.hive.druid.DruidStorageHandler",
+          "org.apache.hive.storage.jdbc.JdbcStorageHandler", "org.apache.commons.dbcp.BasicDataSourceFactory",
+          "org.apache.commons.pool.impl.GenericObjectPool", "org.apache.hadoop.hive.kafka.KafkaStorageHandler"};
+  private static final String HBASE_SERDE_CLASS = "org.apache.hadoop.hive.hbase.HBaseSerDe";
+
+  private final LlapServiceCommandLine cl;
+  private final HiveConf conf;
+  private final FileSystem rawFs;
+  private final Path libDir;
+
+  AsyncTaskCopyAuxJars(LlapServiceCommandLine cl, HiveConf conf, FileSystem rawFs, Path libDir) {
+    this.cl = cl;
+    this.conf = conf;
+    this.rawFs = rawFs;
+    this.libDir = libDir;
+  }
+
+  @Override
+  public Void call() throws Exception {
+    localizeJarForClass(Arrays.asList(DEFAULT_AUX_CLASSES), false);
+    localizeJarForClass(conf.getStringCollection("io.compression.codecs"), false);
+    localizeJarForClass(getDbSpecificJdbcJars(), false);
+
+    if (cl.getIsHBase()) {
+      try {
+        localizeJarForClass(Arrays.asList(HBASE_SERDE_CLASS), true);
+        Job fakeJob = Job.getInstance(new JobConf()); // HBase API is convoluted.
+        TableMapReduceUtil.addDependencyJars(fakeJob);
+        Collection<String> hbaseJars = fakeJob.getConfiguration().getStringCollection("tmpjars");
+        for (String jarPath : hbaseJars) {
+          if (!jarPath.isEmpty()) {
+            rawFs.copyFromLocalFile(new Path(jarPath), libDir);
+          }
+        }
+      } catch (Throwable t) {
+        String err = "Failed to add HBase jars. Use --auxhbase=false to avoid localizing them";
+        LOG.error(err);
+        System.err.println(err);
+        throw new RuntimeException(t);
+      }
+    }
+
+    Set<String> auxJars = new HashSet<>();
+    // There are many ways to have AUX jars in Hive... sigh
+    if (cl.getIsHiveAux()) {
+      // Note: we don't add ADDED jars, RELOADABLE jars, etc. That is by design; there are too many ways
+      // to add jars in Hive, some of which are session/etc. specific. Env + conf + arg should be enough.
+      addAuxJarsToSet(auxJars, conf.getAuxJars(), ",");
+      addAuxJarsToSet(auxJars, System.getenv("HIVE_AUX_JARS_PATH"), ":");
+      LOG.info("Adding the following aux jars from the environment and configs: " + auxJars);
+    }
+
+    addAuxJarsToSet(auxJars, cl.getAuxJars(), ",");
+    for (String jarPath : auxJars) {
+      rawFs.copyFromLocalFile(new Path(jarPath), libDir);
+    }
+    return null;
+  }
+
+  private void localizeJarForClass(Collection<String> classNames, boolean doThrow) throws IOException {
+    if (CollectionUtils.isEmpty(classNames)) {
+      return;
+    }
+
+    for (String className : classNames) {
+      String jarPath = null;
+      boolean hasException = false;
+      try {
+        Class<?> clazz = Class.forName(className);
+        jarPath = Utilities.jarFinderGetJar(clazz);
+      } catch (Throwable t) {
+        if (doThrow) {
+          throw (t instanceof IOException) ? (IOException)t : new IOException(t);
+        }
+        hasException = true;
+        String err = "Cannot find a jar for [" + className + "] due to an exception (" +
+            t.getMessage() + "); not packaging the jar";
+        LOG.error(err);
+        System.err.println(err);
+      }
+
+      if (jarPath != null) {
+        rawFs.copyFromLocalFile(new Path(jarPath), libDir);
+      } else if (!hasException) {
+        String err = "Cannot find a jar for [" + className + "]; not packaging the jar";
+        if (doThrow) {
+          throw new IOException(err);
+        }
+        LOG.error(err);
+        System.err.println(err);
+      }
+    }
+  }
+
+  private List<String> getDbSpecificJdbcJars() {
+    List<String> jdbcJars = new ArrayList<String>();
+    addJarForClassToListIfExists("com.mysql.jdbc.Driver", jdbcJars); // add mysql jdbc driver
+    addJarForClassToListIfExists("org.postgresql.Driver", jdbcJars); // add postgresql jdbc driver
+    addJarForClassToListIfExists("oracle.jdbc.OracleDriver", jdbcJars); // add oracle jdbc driver
+    addJarForClassToListIfExists("com.microsoft.sqlserver.jdbc.SQLServerDriver", jdbcJars); // add mssql jdbc driver
+    return jdbcJars;
+  }
+
+  private void addJarForClassToListIfExists(String cls, List<String> jarList) {
+    try {
+      Class.forName(cls);
+      jarList.add(cls);
+    } catch (Exception e) {
+    }
+  }
+
+  private void addAuxJarsToSet(Set<String> auxJarSet, String auxJars, String delimiter) {
+    if (StringUtils.isNotEmpty(auxJars)) {
+      // TODO: transitive dependencies warning?
+      String[] jarPaths = auxJars.split(delimiter);
+      for (String jarPath : jarPaths) {
+        if (!jarPath.isEmpty()) {
+          auxJarSet.add(jarPath);
+        }
+      }
+    }
+  }
+}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyConfigs.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyConfigs.java
new file mode 100644 (file)
index 0000000..9d5b385
--- /dev/null
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cli.service;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Properties;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
+import org.apache.hadoop.hive.llap.daemon.impl.LlapConstants;
+import org.apache.hadoop.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Copy config files for the tarball. */
+class AsyncTaskCopyConfigs implements Callable<Void> {
+  private static final Logger LOG = LoggerFactory.getLogger(AsyncTaskCopyConfigs.class.getName());
+
+  private final LlapServiceCommandLine cl;
+  private final HiveConf conf;
+  private final Properties directProperties;
+  private final FileSystem rawFs;
+  private final Path confDir;
+
+  AsyncTaskCopyConfigs(LlapServiceCommandLine cl, HiveConf conf, Properties directProperties, FileSystem rawFs,
+      Path confDir) {
+    this.cl = cl;
+    this.conf = conf;
+    this.directProperties = directProperties;
+    this.rawFs = rawFs;
+    this.confDir = confDir;
+  }
+
+  @Override
+  public Void call() throws Exception {
+    // Copy over the mandatory configs for the package.
+    for (String f : LlapDaemonConfiguration.DAEMON_CONFIGS) {
+      copyConfig(f);
+    }
+    for (String f : LlapDaemonConfiguration.SSL_DAEMON_CONFIGS) {
+      try {
+        copyConfig(f);
+      } catch (Throwable t) {
+        LOG.info("Error getting an optional config " + f + "; ignoring: " + t.getMessage());
+      }
+    }
+    createLlapDaemonConfig();
+    setUpLoggerConfig();
+    setUpMetricsConfig();
+    return null;
+  }
+
+  private void copyConfig(String f) throws IOException {
+    HiveConf.getBoolVar(new Configuration(false), ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS);
+    // they will be file:// URLs
+    rawFs.copyFromLocalFile(new Path(conf.getResource(f).toString()), confDir);
+  }
+
+  private void createLlapDaemonConfig() throws IOException {
+    FSDataOutputStream confStream = rawFs.create(new Path(confDir, LlapDaemonConfiguration.LLAP_DAEMON_SITE));
+
+    Configuration llapDaemonConf = resolve();
+
+    llapDaemonConf.writeXml(confStream);
+    confStream.close();
+  }
+
+  private Configuration resolve() {
+    Configuration target = new Configuration(false);
+
+    populateConf(target, cl.getConfig(), "CLI hiveconf");
+    populateConf(target, directProperties, "CLI direct");
+
+    return target;
+  }
+
+  private void populateConf(Configuration target, Properties properties, String source) {
+    for (Entry<Object, Object> entry : properties.entrySet()) {
+      String key = (String) entry.getKey();
+      String val = conf.get(key);
+      if (val != null) {
+        target.set(key, val, source);
+      }
+    }
+  }
+
+  private void setUpLoggerConfig() throws Exception {
+    // logger can be a resource stream or a real file (cannot use copy)
+    URL logger = conf.getResource(LlapConstants.LOG4j2_PROPERTIES_FILE);
+    if (null == logger) {
+      throw new Exception("Unable to find required config file: llap-daemon-log4j2.properties");
+    }
+    InputStream loggerContent = logger.openStream();
+    IOUtils.copyBytes(loggerContent,
+        rawFs.create(new Path(confDir, "llap-daemon-log4j2.properties"), true), conf, true);
+  }
+
+  private void setUpMetricsConfig() throws IOException {
+    String metricsFile = LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE;
+    URL metrics2 = conf.getResource(metricsFile);
+    if (metrics2 == null) {
+      LOG.warn(LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE + " cannot be found." +
+          " Looking for " + LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE);
+      metricsFile = LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE;
+      metrics2 = conf.getResource(metricsFile);
+    }
+    if (metrics2 != null) {
+      InputStream metrics2FileStream = metrics2.openStream();
+      IOUtils.copyBytes(metrics2FileStream, rawFs.create(new Path(confDir, metricsFile), true), conf, true);
+      LOG.info("Copied hadoop metrics2 properties file from " + metrics2);
+    } else {
+      LOG.warn("Cannot find " + LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE + " or " +
+          LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE + " in classpath.");
+    }
+  }
+}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyLocalJars.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyLocalJars.java
new file mode 100644 (file)
index 0000000..90f9b2c
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cli.service;
+
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapInputFormat;
+import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.eclipse.jetty.rewrite.handler.Rule;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Copy local jars for the tarball. */
+class AsyncTaskCopyLocalJars implements Callable<Void> {
+  private static final Logger LOG = LoggerFactory.getLogger(AsyncTaskCopyLocalJars.class.getName());
+
+  private final FileSystem rawFs;
+  private final Path libDir;
+
+  AsyncTaskCopyLocalJars(FileSystem rawFs, Path libDir) {
+    this.rawFs = rawFs;
+    this.libDir = libDir;
+  }
+
+  @Override
+  public Void call() throws Exception {
+    Class<?>[] dependencies = new Class<?>[] {
+        LlapDaemonProtocolProtos.class, // llap-common
+        LlapTezUtils.class, // llap-tez
+        LlapInputFormat.class, // llap-server
+        HiveInputFormat.class, // hive-exec
+        SslContextFactory.class, // hive-common (https deps)
+        Rule.class, // Jetty rewrite class
+        RegistryUtils.ServiceRecordMarshal.class, // ZK registry
+        // log4j2
+        com.lmax.disruptor.RingBuffer.class, // disruptor
+        org.apache.logging.log4j.Logger.class, // log4j-api
+        org.apache.logging.log4j.core.Appender.class, // log4j-core
+        org.apache.logging.slf4j.Log4jLogger.class, // log4j-slf4j
+        // log4j-1.2-API needed for NDC
+        org.apache.log4j.config.Log4j1ConfigurationFactory.class,
+        io.netty.util.NetUtil.class, // netty4
+        org.jboss.netty.util.NetUtil.class, //netty3
+        org.apache.arrow.vector.types.pojo.ArrowType.class, //arrow-vector
+        org.apache.arrow.memory.BaseAllocator.class, //arrow-memory
+        org.apache.arrow.flatbuf.Schema.class, //arrow-format
+        com.google.flatbuffers.Table.class, //flatbuffers
+        com.carrotsearch.hppc.ByteArrayDeque.class //hppc
+    };
+
+    for (Class<?> c : dependencies) {
+      Path jarPath = new Path(Utilities.jarFinderGetJar(c));
+      rawFs.copyFromLocalFile(jarPath, libDir);
+      LOG.debug("Copying " + jarPath + " to " + libDir);
+    }
+    return null;
+  }
+}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCreateUdfFile.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCreateUdfFile.java
new file mode 100644 (file)
index 0000000..430471e
--- /dev/null
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cli.service;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.daemon.impl.StaticPermanentFunctionChecker;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.ResourceUri;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.util.ResourceDownloader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Create the list of allowed UDFs for the tarball. */
+class AsyncTaskCreateUdfFile implements Callable<Void> {
+  private static final Logger LOG = LoggerFactory.getLogger(AsyncTaskCreateUdfFile.class.getName());
+
+  private final HiveConf conf;
+  private final FileSystem fs;
+  private final FileSystem rawFs;
+  private final Path udfDir;
+  private final Path confDir;
+
+  AsyncTaskCreateUdfFile(HiveConf conf, FileSystem fs, FileSystem rawFs, Path udfDir, Path confDir) {
+    this.conf = conf;
+    this.fs = fs;
+    this.rawFs = rawFs;
+    this.udfDir = udfDir;
+    this.confDir = confDir;
+  }
+
+  @Override
+  public Void call() throws Exception {
+    // UDFs
+    final Set<String> allowedUdfs;
+
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOW_PERMANENT_FNS)) {
+      synchronized (fs) {
+        allowedUdfs = downloadPermanentFunctions();
+      }
+    } else {
+      allowedUdfs = Collections.emptySet();
+    }
+
+    OutputStream os = rawFs.create(new Path(confDir, StaticPermanentFunctionChecker.PERMANENT_FUNCTIONS_LIST));
+    OutputStreamWriter osw = new OutputStreamWriter(os, Charset.defaultCharset());
+    PrintWriter udfStream = new PrintWriter(osw);
+    for (String udfClass : allowedUdfs) {
+      udfStream.println(udfClass);
+    }
+
+    udfStream.close();
+    return null;
+  }
+
+  private Set<String> downloadPermanentFunctions() throws HiveException, URISyntaxException, IOException {
+    Map<String, String> udfs = new HashMap<String, String>();
+    HiveConf hiveConf = new HiveConf();
+    // disable expensive operations on the metastore
+    hiveConf.setBoolean(MetastoreConf.ConfVars.INIT_METADATA_COUNT_ENABLED.getVarname(), false);
+    hiveConf.setBoolean(MetastoreConf.ConfVars.METRICS_ENABLED.getVarname(), false);
+    // performance problem: ObjectStore does its own new HiveConf()
+    Hive hive = Hive.getWithFastCheck(hiveConf, false);
+    ResourceDownloader resourceDownloader = new ResourceDownloader(conf, udfDir.toUri().normalize().getPath());
+    List<Function> fns = hive.getAllFunctions();
+    Set<URI> srcUris = new HashSet<>();
+    for (Function fn : fns) {
+      String fqfn = fn.getDbName() + "." + fn.getFunctionName();
+      if (udfs.containsKey(fn.getClassName())) {
+        LOG.warn("Duplicate function names found for " + fn.getClassName() + " with " + fqfn + " and " +
+            udfs.get(fn.getClassName()));
+      }
+      udfs.put(fn.getClassName(), fqfn);
+      List<ResourceUri> resources = fn.getResourceUris();
+      if (resources == null || resources.isEmpty()) {
+        LOG.warn("Missing resources for " + fqfn);
+        continue;
+      }
+      for (ResourceUri resource : resources) {
+        srcUris.add(ResourceDownloader.createURI(resource.getUri()));
+      }
+    }
+    for (URI srcUri : srcUris) {
+      List<URI> localUris = resourceDownloader.downloadExternal(srcUri, null, false);
+      for(URI dst : localUris) {
+        LOG.warn("Downloaded " + dst + " from " + srcUri);
+      }
+    }
+    return udfs.keySet();
+  }
+}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskDownloadTezJars.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskDownloadTezJars.java
new file mode 100644 (file)
index 0000000..29b05a6
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cli.service;
+
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.CompressionUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Download tez related jars for the tarball. */
+class AsyncTaskDownloadTezJars implements Callable<Void> {
+  private static final Logger LOG = LoggerFactory.getLogger(AsyncTaskDownloadTezJars.class.getName());
+
+  private final HiveConf conf;
+  private final FileSystem fs;
+  private final FileSystem rawFs;
+  private final Path libDir;
+  private final Path tezDir;
+
+  AsyncTaskDownloadTezJars(HiveConf conf, FileSystem fs, FileSystem rawFs, Path libDir, Path tezDir) {
+    this.conf = conf;
+    this.fs = fs;
+    this.rawFs = rawFs;
+    this.libDir = libDir;
+    this.tezDir = tezDir;
+  }
+
+  @Override
+  public Void call() throws Exception {
+    synchronized (fs) {
+      String tezLibs = conf.get(TezConfiguration.TEZ_LIB_URIS);
+      if (tezLibs == null) {
+        LOG.warn("Missing tez.lib.uris in tez-site.xml");
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Copying tez libs from " + tezLibs);
+      }
+      rawFs.mkdirs(tezDir);
+      fs.copyToLocalFile(new Path(tezLibs), new Path(libDir, "tez.tar.gz"));
+      CompressionUtils.unTar(new Path(libDir, "tez.tar.gz").toString(), tezDir.toString(), true);
+      rawFs.delete(new Path(libDir, "tez.tar.gz"), false);
+    }
+    return null;
+  }
+}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapConfigJsonCreator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapConfigJsonCreator.java
new file mode 100644 (file)
index 0000000..8e9b939
--- /dev/null
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cli.service;
+
+import java.io.OutputStreamWriter;
+import java.nio.charset.Charset;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+/**
+ * Creates the config json for llap start.
+ */
+class LlapConfigJsonCreator {
+  // This is not a config that users set in hive-site. It's only use is to share information
+  // between the java component of the service driver and the python component.
+  private static final String CONFIG_CLUSTER_NAME = "private.hive.llap.servicedriver.cluster.name";
+
+  private final HiveConf conf;
+  private final FileSystem fs;
+  private final Path tmpDir;
+
+  private final long cache;
+  private final long xmx;
+  private final String javaHome;
+
+  LlapConfigJsonCreator(HiveConf conf, FileSystem fs, Path tmpDir, long cache, long xmx, String javaHome) {
+    this.conf = conf;
+    this.fs = fs;
+    this.tmpDir = tmpDir;
+    this.cache = cache;
+    this.xmx = xmx;
+    this.javaHome = javaHome;
+  }
+
+  void createLlapConfigJson() throws Exception {
+    JSONObject configs = createConfigJson();
+    writeConfigJson(configs);
+  }
+
+  private JSONObject createConfigJson() throws JSONException {
+    // extract configs for processing by the python fragments in YARN Service
+    JSONObject configs = new JSONObject();
+
+    configs.put("java.home", javaHome);
+
+    configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname,
+        conf.getLongVar(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB));
+
+    configs.put(ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, conf.getSizeVar(ConfVars.LLAP_IO_MEMORY_MAX_SIZE));
+
+    configs.put(ConfVars.LLAP_ALLOCATOR_DIRECT.varname, conf.getBoolVar(HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT));
+
+    configs.put(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname,
+        conf.getIntVar(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB));
+
+    configs.put(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE.varname,
+        conf.getIntVar(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE));
+
+    configs.put(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, conf.getIntVar(ConfVars.LLAP_DAEMON_NUM_EXECUTORS));
+
+    // Let YARN pick the queue name, if it isn't provided in hive-site, or via the command-line
+    if (conf.getVar(ConfVars.LLAP_DAEMON_QUEUE_NAME) != null) {
+      configs.put(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, conf.getVar(ConfVars.LLAP_DAEMON_QUEUE_NAME));
+    }
+
+    // Propagate the cluster name to the script.
+    String clusterHosts = conf.getVar(ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
+    if (!StringUtils.isEmpty(clusterHosts) && clusterHosts.startsWith("@") && clusterHosts.length() > 1) {
+      configs.put(CONFIG_CLUSTER_NAME, clusterHosts.substring(1));
+    }
+
+    configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1));
+
+    configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+        conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, -1));
+
+    long maxDirect = (xmx > 0 && cache > 0 && xmx < cache * 1.25) ? (long) (cache * 1.25) : -1;
+    configs.put("max_direct_memory", Long.toString(maxDirect));
+
+    return configs;
+  }
+
+  private void writeConfigJson(JSONObject configs) throws Exception {
+    try (FSDataOutputStream fsdos = fs.create(new Path(tmpDir, "config.json"));
+         OutputStreamWriter w = new OutputStreamWriter(fsdos, Charset.defaultCharset())) {
+      configs.write(w);
+    }
+  }
+}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapServiceCommandLine.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapServiceCommandLine.java
new file mode 100644 (file)
index 0000000..5323102
--- /dev/null
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cli.service;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+
+import jline.TerminalFactory;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.log.LogHelpers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
+
+@SuppressWarnings("static-access")
+class LlapServiceCommandLine {
+  private static final Logger LOG = LoggerFactory.getLogger(LlapServiceCommandLine.class.getName());
+
+  private static final Option DIRECTORY = OptionBuilder
+      .withLongOpt("directory")
+      .withDescription("Temp directory for jars etc.")
+      .withArgName("directory")
+      .hasArg()
+      .create('d');
+
+  private static final Option NAME = OptionBuilder
+      .withLongOpt("name")
+      .withDescription("Cluster name for YARN registry")
+      .withArgName("name")
+      .hasArg()
+      .create('n');
+
+  private static final Option EXECUTORS = OptionBuilder
+      .withLongOpt("executors")
+      .withDescription("executor per instance")
+      .withArgName("executors")
+      .hasArg()
+      .create('e');
+
+  private static final Option IO_THREADS = OptionBuilder
+      .withLongOpt("iothreads")
+      .withDescription("iothreads per instance")
+      .withArgName("iothreads")
+      .hasArg()
+      .create('t');
+
+  private static final Option CACHE = OptionBuilder
+      .withLongOpt("cache")
+      .withDescription("cache size per instance")
+      .withArgName("cache")
+      .hasArg()
+      .create('c');
+
+  private static final Option SIZE = OptionBuilder
+      .withLongOpt("size")
+      .withDescription("cache size per instance")
+      .withArgName("size")
+      .hasArg()
+      .create('s');
+
+  private static final Option XMX = OptionBuilder
+      .withLongOpt("xmx")
+      .withDescription("working memory size")
+      .withArgName("xmx")
+      .hasArg()
+      .create('w');
+
+  private static final Option AUXJARS = OptionBuilder
+      .withLongOpt("auxjars")
+      .withDescription("additional jars to package (by default, JSON SerDe jar is packaged if available)")
+      .withArgName("auxjars")
+      .hasArg()
+      .create('j');
+
+  private static final Option AUXHBASE = OptionBuilder
+      .withLongOpt("auxhbase")
+      .withDescription("whether to package the HBase jars (true by default)")
+      .withArgName("auxhbase")
+      .hasArg()
+      .create('h');
+
+  private static final Option HIVECONF = OptionBuilder
+      .withLongOpt("hiveconf")
+      .withDescription("Use value for given property. Overridden by explicit parameters")
+      .withArgName("property=value")
+      .hasArgs(2)
+      .withValueSeparator()
+      .create();
+
+  private static final Option JAVAHOME = OptionBuilder
+      .withLongOpt("javaHome")
+      .withDescription("Path to the JRE/JDK. This should be installed at the same location on all cluster nodes " +
+          "($JAVA_HOME, java.home by default)")
+      .withArgName("javaHome")
+      .hasArg()
+      .create();
+
+  private static final Option QUEUE = OptionBuilder
+      .withLongOpt("queue")
+      .withDescription("The queue within which LLAP will be started")
+      .withArgName("queue")
+      .hasArg()
+      .create('q');
+
+  private static final Set<String> VALID_LOGGERS = ImmutableSet.of(LogHelpers.LLAP_LOGGER_NAME_RFA.toLowerCase(),
+      LogHelpers.LLAP_LOGGER_NAME_QUERY_ROUTING.toLowerCase(), LogHelpers.LLAP_LOGGER_NAME_CONSOLE.toLowerCase());
+
+  private static final Option LOGGER = OptionBuilder
+      .withLongOpt("logger")
+      .withDescription("logger for llap instance ([" + VALID_LOGGERS + "]")
+      .withArgName("logger")
+      .hasArg()
+      .create();
+
+  private static final Option START = OptionBuilder
+      .withLongOpt("startImmediately")
+      .withDescription("immediately start the cluster")
+      .withArgName("startImmediately")
+      .hasArg(false)
+      .create('z');
+
+  private static final Option OUTPUT = OptionBuilder
+      .withLongOpt("output")
+      .withDescription("Output directory for the generated scripts")
+      .withArgName("output")
+      .hasArg()
+      .create();
+
+  private static final Option AUXHIVE = OptionBuilder
+      .withLongOpt("auxhive")
+      .withDescription("whether to package the Hive aux jars (true by default)")
+      .withArgName("auxhive")
+      .hasArg()
+      .create("auxhive");
+
+  private static final Option HELP = OptionBuilder
+      .withLongOpt("help")
+      .withDescription("Print help information")
+      .withArgName("help")
+      .hasArg(false)
+      .create('H');
+
+  // Options for the python script that are here because our option parser cannot ignore the unknown ones
+  private static final String OPTION_INSTANCES = "instances";
+  private static final String OPTION_ARGS = "args";
+  private static final String OPTION_LOGLEVEL = "loglevel";
+  private static final String OPTION_SERVICE_KEYTAB_DIR = "service-keytab-dir";
+  private static final String OPTION_SERVICE_KEYTAB = "service-keytab";
+  private static final String OPTION_SERVICE_PRINCIPAL = "service-principal";
+  private static final String OPTION_SERVICE_PLACEMENT = "service-placement";
+  private static final String OPTION_SERVICE_DEFAULT_KEYTAB = "service-default-keytab";
+  private static final String OPTION_HEALTH_PERCENT = "health-percent";
+  private static final String OPTION_HEALTH_TIME_WINDOW_SECS = "health-time-window-secs";
+  private static final String OPTION_HEALTH_INIT_DELAY_SECS = "health-init-delay-secs";
+  private static final String OPTION_SERVICE_AM_CONTAINER_MB = "service-am-container-mb";
+  private static final String OPTION_SERVICE_APPCONFIG_GLOBAL = "service-appconfig-global";
+
+  private static final Options OPTIONS = new Options();
+  static {
+    OPTIONS.addOption(DIRECTORY);
+    OPTIONS.addOption(NAME);
+    OPTIONS.addOption(EXECUTORS);
+    OPTIONS.addOption(IO_THREADS);
+    OPTIONS.addOption(CACHE);
+    OPTIONS.addOption(SIZE);
+    OPTIONS.addOption(XMX);
+    OPTIONS.addOption(AUXJARS);
+    OPTIONS.addOption(AUXHBASE);
+    OPTIONS.addOption(HIVECONF);
+    OPTIONS.addOption(JAVAHOME);
+    OPTIONS.addOption(QUEUE);
+    OPTIONS.addOption(LOGGER);
+    OPTIONS.addOption(START);
+    OPTIONS.addOption(OUTPUT);
+    OPTIONS.addOption(AUXHIVE);
+    OPTIONS.addOption(HELP);
+
+    OPTIONS.addOption(OptionBuilder
+        .withLongOpt(OPTION_INSTANCES)
+        .withDescription("Specify the number of instances to run this on")
+        .withArgName(OPTION_INSTANCES)
+        .hasArg()
+        .create('i'));
+
+    OPTIONS.addOption(OptionBuilder
+        .withLongOpt(OPTION_ARGS)
+        .withDescription("java arguments to the llap instance")
+        .withArgName(OPTION_ARGS)
+        .hasArg()
+        .create('a'));
+
+    OPTIONS.addOption(OptionBuilder
+        .withLongOpt(OPTION_LOGLEVEL)
+        .withDescription("log levels for the llap instance")
+        .withArgName(OPTION_LOGLEVEL)
+        .hasArg()
+        .create('l'));
+
+    OPTIONS.addOption(OptionBuilder
+        .withLongOpt(OPTION_SERVICE_KEYTAB_DIR)
+        .withDescription("Service AM keytab directory on HDFS (where the headless user keytab is stored by Service " +
+            "keytab installation, e.g. .yarn/keytabs/llap)")
+        .withArgName(OPTION_SERVICE_KEYTAB_DIR)
+        .hasArg()
+        .create());
+
+    OPTIONS.addOption(OptionBuilder
+        .withLongOpt(OPTION_SERVICE_KEYTAB)
+        .withDescription("Service AM keytab file name inside " + OPTION_SERVICE_KEYTAB_DIR)
+        .withArgName(OPTION_SERVICE_KEYTAB)
+        .hasArg()
+        .create());
+
+    OPTIONS.addOption(OptionBuilder
+        .withLongOpt(OPTION_SERVICE_PRINCIPAL)
+        .withDescription("Service AM principal; should be the user running the cluster, e.g. hive@EXAMPLE.COM")
+        .withArgName(OPTION_SERVICE_PRINCIPAL)
+        .hasArg()
+        .create());
+
+    OPTIONS.addOption(OptionBuilder
+        .withLongOpt(OPTION_SERVICE_PLACEMENT)
+        .withDescription("Service placement policy; see YARN documentation at " +
+            "https://issues.apache.org/jira/browse/YARN-1042. This is unnecessary if LLAP is going to take more than " +
+            "half of the YARN capacity of a node.")
+        .withArgName(OPTION_SERVICE_PLACEMENT)
+        .hasArg()
+        .create());
+
+    OPTIONS.addOption(OptionBuilder
+        .withLongOpt(OPTION_SERVICE_DEFAULT_KEYTAB)
+        .withDescription("try to set default settings for Service AM keytab; mostly for dev testing")
+        .withArgName(OPTION_SERVICE_DEFAULT_KEYTAB)
+        .hasArg(false)
+        .create());
+
+    OPTIONS.addOption(OptionBuilder
+        .withLongOpt(OPTION_HEALTH_PERCENT)
+        .withDescription("Percentage of running containers after which LLAP application is considered healthy" +
+            " (Default: 80)")
+        .withArgName(OPTION_HEALTH_PERCENT)
+        .hasArg()
+        .create());
+
+    OPTIONS.addOption(OptionBuilder
+        .withLongOpt(OPTION_HEALTH_TIME_WINDOW_SECS)
+        .withDescription("Time window in seconds (after initial delay) for which LLAP application is allowed to be " +
+            "in unhealthy state before being killed (Default: 300)")
+        .withArgName(OPTION_HEALTH_TIME_WINDOW_SECS)
+        .hasArg()
+        .create());
+
+    OPTIONS.addOption(OptionBuilder
+        .withLongOpt(OPTION_HEALTH_INIT_DELAY_SECS)
+        .withDescription("Delay in seconds after which health percentage is monitored (Default: 400)")
+        .withArgName(OPTION_HEALTH_INIT_DELAY_SECS)
+        .hasArg()
+        .create());
+
+    OPTIONS.addOption(OptionBuilder
+        .withLongOpt(OPTION_SERVICE_AM_CONTAINER_MB)
+        .withDescription("The size of the service AppMaster container in MB")
+        .withArgName("b")
+        .hasArg()
+        .create('b'));
+
+    OPTIONS.addOption(OptionBuilder
+        .withLongOpt(OPTION_SERVICE_APPCONFIG_GLOBAL)
+        .withDescription("Property (key=value) to be set in the global section of the Service appConfig")
+        .withArgName("property=value")
+        .hasArgs(2)
+        .withValueSeparator()
+        .create());
+  }
+
+  private String[] args;
+
+  private String directory;
+  private String name;
+  private int executors;
+  private int ioThreads;
+  private long cache;
+  private long size;
+  private long xmx;
+  private String jars;
+  private boolean isHbase;
+  private Properties conf = new Properties();
+  private String javaPath = null;
+  private String llapQueueName;
+  private String logger = null;
+  private boolean isStarting;
+  private String output;
+  private boolean isHiveAux;
+  private boolean isHelp;
+
+  static LlapServiceCommandLine parseArguments(String[] args) {
+    LlapServiceCommandLine cl = null;
+    try {
+      cl = new LlapServiceCommandLine(args);
+    } catch (Exception e) {
+      LOG.error("Parsing the command line arguments failed", e);
+      printUsage();
+      System.exit(1);
+    }
+
+    if (cl.isHelp) {
+      printUsage();
+      System.exit(0);
+    }
+
+    return cl;
+  }
+
+  LlapServiceCommandLine(String[] args) throws ParseException {
+    LOG.info("LLAP invoked with arguments = {}", Arrays.toString(args));
+    this.args = args;
+    parseCommandLine(args);
+  }
+
+  private void parseCommandLine(String[] args) throws ParseException {
+    CommandLine cl = new GnuParser().parse(OPTIONS, args);
+    if (cl.hasOption(HELP.getOpt())) {
+      isHelp = true;
+      return;
+    }
+
+    if (!cl.hasOption(OPTION_INSTANCES)) {
+      printUsage();
+      throw new ParseException("instance must be set");
+    }
+
+    int instances = Integer.parseInt(cl.getOptionValue(OPTION_INSTANCES));
+    if (instances <= 0) {
+      throw new ParseException("Invalid configuration: " + instances + " (should be greater than 0)");
+    }
+
+    directory = cl.getOptionValue(DIRECTORY.getOpt());
+    name = cl.getOptionValue(NAME.getOpt());
+    executors = Integer.parseInt(cl.getOptionValue(EXECUTORS.getOpt(), "-1"));
+    ioThreads = Integer.parseInt(cl.getOptionValue(IO_THREADS.getOpt(), Integer.toString(executors)));
+    cache = TraditionalBinaryPrefix.string2long(cl.getOptionValue(CACHE.getOpt(), "-1"));
+    size = TraditionalBinaryPrefix.string2long(cl.getOptionValue(SIZE.getOpt(), "-1"));
+    xmx = TraditionalBinaryPrefix.string2long(cl.getOptionValue(XMX.getOpt(), "-1"));
+    jars = cl.getOptionValue(AUXJARS.getOpt());
+    isHbase = Boolean.parseBoolean(cl.getOptionValue(AUXHBASE.getOpt(), "true"));
+    if (cl.hasOption(HIVECONF.getLongOpt())) {
+      conf = cl.getOptionProperties(HIVECONF.getLongOpt());
+    }
+    if (cl.hasOption(JAVAHOME.getLongOpt())) {
+      javaPath = cl.getOptionValue(JAVAHOME.getLongOpt());
+    }
+    llapQueueName = cl.getOptionValue(QUEUE.getOpt(), ConfVars.LLAP_DAEMON_QUEUE_NAME.getDefaultValue());
+    if (cl.hasOption(LOGGER.getLongOpt())) {
+      logger = cl.getOptionValue(LOGGER.getLongOpt());
+      Preconditions.checkArgument(VALID_LOGGERS.contains(logger.toLowerCase()));
+    }
+    isStarting = cl.hasOption(START.getOpt());
+    output = cl.getOptionValue(OUTPUT.getLongOpt());
+    isHiveAux = Boolean.parseBoolean(cl.getOptionValue(AUXHIVE.getOpt(), "true"));
+  }
+
+  private static void printUsage() {
+    HelpFormatter hf = new HelpFormatter();
+    try {
+      int width = hf.getWidth();
+      int jlineWidth = TerminalFactory.get().getWidth();
+      width = Math.min(160, Math.max(jlineWidth, width));
+      hf.setWidth(width);
+    } catch (Throwable t) { // Ignore
+    }
+
+    hf.printHelp("llap", OPTIONS);
+  }
+
+  String[] getArgs() {
+    return args;
+  }
+
+  String getDirectory() {
+    return directory;
+  }
+
+  String getName() {
+    return name;
+  }
+
+  int getExecutors() {
+    return executors;
+  }
+
+  int getIoThreads() {
+    return ioThreads;
+  }
+
+  long getCache() {
+    return cache;
+  }
+
+  long getSize() {
+    return size;
+  }
+
+  long getXmx() {
+    return xmx;
+  }
+
+  String getAuxJars() {
+    return jars;
+  }
+
+  boolean getIsHBase() {
+    return isHbase;
+  }
+
+  boolean getIsHiveAux() {
+    return isHiveAux;
+  }
+
+  Properties getConfig() {
+    return conf;
+  }
+
+  String getJavaPath() {
+    return javaPath;
+  }
+
+  String getLlapQueueName() {
+    return llapQueueName;
+  }
+
+  String getLogger() {
+    return logger;
+  }
+
+  boolean isStarting() {
+    return isStarting;
+  }
+
+  String getOutput() {
+    return output;
+  }
+}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapServiceDriver.java
new file mode 100644 (file)
index 0000000..bbc7265
--- /dev/null
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cli.service;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.llap.cli.LlapSliderUtils;
+import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
+import org.apache.hadoop.hive.llap.daemon.impl.LlapConstants;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.service.client.ServiceClient;
+import org.apache.hadoop.yarn.service.utils.CoreFileSystem;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/** Starts the llap daemon. */
+public class LlapServiceDriver {
+  private static final Logger LOG = LoggerFactory.getLogger(LlapServiceDriver.class.getName());
+
+  private static final String LLAP_PACKAGE_DIR = ".yarn/package/LLAP/";
+  private static final String OUTPUT_DIR_PREFIX = "llap-yarn-";
+
+  /**
+   * This is a working configuration for the instance to merge various variables.
+   * It is not written out for llap server usage
+   */
+  private final HiveConf conf;
+  private final LlapServiceCommandLine cl;
+
+  public LlapServiceDriver(LlapServiceCommandLine cl) throws Exception {
+    this.cl = cl;
+
+    SessionState ss = SessionState.get();
+    this.conf = (ss != null) ? ss.getConf() : new HiveConf(SessionState.class);
+    if (conf == null) {
+      throw new Exception("Cannot load any configuration to run command");
+    }
+  }
+
+  private int run() throws Exception {
+    Properties propsDirectOptions = new Properties();
+
+    // Working directory.
+    Path tmpDir = new Path(cl.getDirectory());
+
+    long t0 = System.nanoTime();
+
+    FileSystem fs = FileSystem.get(conf);
+    FileSystem rawFs = FileSystem.getLocal(conf).getRawFileSystem();
+
+    int threadCount = Math.max(1, Runtime.getRuntime().availableProcessors() / 2);
+    ExecutorService executor = Executors.newFixedThreadPool(threadCount,
+            new ThreadFactoryBuilder().setNameFormat("llap-pkg-%d").build());
+
+    int rc = 0;
+    try {
+
+      setupConf(propsDirectOptions);
+
+      URL logger = conf.getResource(LlapConstants.LOG4j2_PROPERTIES_FILE);
+      if (logger == null) {
+        throw new Exception("Unable to find required config file: llap-daemon-log4j2.properties");
+      }
+
+      Path home = new Path(System.getenv("HIVE_HOME"));
+      Path scriptParent = new Path(new Path(home, "scripts"), "llap");
+      Path scripts = new Path(scriptParent, "bin");
+
+      if (!rawFs.exists(home)) {
+        throw new Exception("Unable to find HIVE_HOME:" + home);
+      } else if (!rawFs.exists(scripts)) {
+        LOG.warn("Unable to find llap scripts:" + scripts);
+      }
+
+      String javaHome = getJavaHome();
+
+      LlapTarComponentGatherer tarComponentGatherer = new LlapTarComponentGatherer(cl, conf, propsDirectOptions,
+          fs, rawFs, executor, tmpDir);
+      tarComponentGatherer.createDirs();
+      tarComponentGatherer.submitTarComponentGatherTasks();
+
+      // TODO: need to move from Python to Java for the rest of the script.
+      LlapConfigJsonCreator lcjCreator = new LlapConfigJsonCreator(conf, rawFs, tmpDir, cl.getCache(), cl.getXmx(),
+          javaHome);
+      lcjCreator.createLlapConfigJson();
+
+      LOG.debug("Config Json generation took " + (System.nanoTime() - t0) + " ns");
+
+      tarComponentGatherer.waitForFinish();
+
+      if (cl.isStarting()) {
+        rc = startLlap(tmpDir, scriptParent);
+      } else {
+        rc = 0;
+      }
+    } finally {
+      executor.shutdown();
+      rawFs.close();
+      fs.close();
+    }
+
+    if (rc == 0) {
+      LOG.debug("Exiting successfully");
+    } else {
+      LOG.info("Exiting with rc = " + rc);
+    }
+    return rc;
+  }
+
+  private void setupConf(Properties propsDirectOptions) throws Exception {
+    // needed so that the file is actually loaded into configuration.
+    for (String f : LlapDaemonConfiguration.DAEMON_CONFIGS) {
+      conf.addResource(f);
+      if (conf.getResource(f) == null) {
+        throw new Exception("Unable to find required config file: " + f);
+      }
+    }
+    for (String f : LlapDaemonConfiguration.SSL_DAEMON_CONFIGS) {
+      conf.addResource(f);
+    }
+
+    conf.reloadConfiguration();
+
+    populateConfWithLlapProperties(conf, cl.getConfig());
+
+    if (cl.getName() != null) {
+      // update service registry configs - caveat: this has nothing to do with the actual settings as read by the AM
+      // if needed, use --hiveconf llap.daemon.service.hosts=@llap0 to dynamically switch between instances
+      conf.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + cl.getName());
+      propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + cl.getName());
+    }
+
+    if (cl.getLogger() != null) {
+      HiveConf.setVar(conf, ConfVars.LLAP_DAEMON_LOGGER, cl.getLogger());
+      propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_LOGGER.varname, cl.getLogger());
+    }
+
+    boolean isDirect = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT);
+
+    String cacheStr = LlapUtil.humanReadableByteCount(cl.getCache());
+    String sizeStr = LlapUtil.humanReadableByteCount(cl.getSize());
+    String xmxStr = LlapUtil.humanReadableByteCount(cl.getXmx());
+
+    if (cl.getSize() != -1) {
+      if (cl.getCache() != -1) {
+        if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED)) {
+          // direct heap allocations need to be safer
+          Preconditions.checkArgument(cl.getCache() < cl.getSize(), "Cache size (" + cacheStr + ") has to be smaller" +
+              " than the container sizing (" + sizeStr + ")");
+        } else if (cl.getCache() < cl.getSize()) {
+          LOG.warn("Note that this might need YARN physical memory monitoring to be turned off "
+              + "(yarn.nodemanager.pmem-check-enabled=false)");
+        }
+      }
+      if (cl.getXmx() != -1) {
+        Preconditions.checkArgument(cl.getXmx() < cl.getSize(), "Working memory (Xmx=" + xmxStr + ") has to be" +
+            " smaller than the container sizing (" + sizeStr + ")");
+      }
+      if (isDirect && !HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED)) {
+        // direct and not memory mapped
+        Preconditions.checkArgument(cl.getXmx() + cl.getCache() <= cl.getSize(), "Working memory (Xmx=" +
+            xmxStr + ") + cache size (" + cacheStr + ") has to be smaller than the container sizing (" + sizeStr + ")");
+      }
+    }
+
+    if (cl.getExecutors() != -1) {
+      conf.setLong(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, cl.getExecutors());
+      propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, String.valueOf(cl.getExecutors()));
+      // TODO: vcpu settings - possibly when DRFA works right
+    }
+
+    if (cl.getIoThreads() != -1) {
+      conf.setLong(ConfVars.LLAP_IO_THREADPOOL_SIZE.varname, cl.getIoThreads());
+      propsDirectOptions.setProperty(ConfVars.LLAP_IO_THREADPOOL_SIZE.varname, String.valueOf(cl.getIoThreads()));
+    }
+
+    long cache = cl.getCache();
+    if (cache != -1) {
+      conf.set(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, Long.toString(cache));
+      propsDirectOptions.setProperty(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, Long.toString(cache));
+    }
+
+    long xmx = cl.getXmx();
+    if (xmx != -1) {
+      // Needs more explanation here
+      // Xmx is not the max heap value in JDK8. You need to subtract 50% of the survivor fraction
+      // from this, to get actual usable memory before it goes into GC
+      long xmxMb = (xmx / (1024L * 1024L));
+      conf.setLong(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, xmxMb);
+      propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, String.valueOf(xmxMb));
+    }
+
+    long containerSize = cl.getSize();
+    if (containerSize == -1) {
+      long heapSize = xmx;
+      if (!isDirect) {
+        heapSize += cache;
+      }
+      containerSize = Math.min((long)(heapSize * 1.2), heapSize + 1024L * 1024 * 1024);
+      if (isDirect) {
+        containerSize += cache;
+      }
+    }
+    long containerSizeMB = containerSize / (1024 * 1024);
+    long minAllocMB = conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1);
+    String containerSizeStr = LlapUtil.humanReadableByteCount(containerSize);
+    Preconditions.checkArgument(containerSizeMB >= minAllocMB, "Container size (" + containerSizeStr + ") should be " +
+        "greater than minimum allocation(" + LlapUtil.humanReadableByteCount(minAllocMB * 1024L * 1024L) + ")");
+    conf.setLong(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSizeMB);
+    propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, String.valueOf(containerSizeMB));
+
+    LOG.info("Memory settings: container memory: {} executor memory: {} cache memory: {}", containerSizeStr, xmxStr,
+        cacheStr);
+
+    if (!StringUtils.isEmpty(cl.getLlapQueueName())) {
+      conf.set(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, cl.getLlapQueueName());
+      propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, cl.getLlapQueueName());
+    }
+  }
+
+  private String getJavaHome() {
+    String javaHome = cl.getJavaPath();
+    if (StringUtils.isEmpty(javaHome)) {
+      javaHome = System.getenv("JAVA_HOME");
+      String jreHome = System.getProperty("java.home");
+      if (javaHome == null) {
+        javaHome = jreHome;
+      } else if (!javaHome.equals(jreHome)) {
+        LOG.warn("Java versions might not match : JAVA_HOME=[{}],process jre=[{}]", javaHome, jreHome);
+      }
+    }
+    if (StringUtils.isEmpty(javaHome)) {
+      throw new RuntimeException(
+          "Could not determine JAVA_HOME from command line parameters, environment or system properties");
+    }
+    LOG.info("Using [{}] for JAVA_HOME", javaHome);
+    return javaHome;
+  }
+
+  private static void populateConfWithLlapProperties(Configuration conf, Properties properties) {
+    for(Entry<Object, Object> props : properties.entrySet()) {
+      String key = (String) props.getKey();
+      if (HiveConf.getLlapDaemonConfVars().contains(key)) {
+        conf.set(key, (String) props.getValue());
+      } else {
+        if (key.startsWith(HiveConf.PREFIX_LLAP) || key.startsWith(HiveConf.PREFIX_HIVE_LLAP)) {
+          LOG.warn("Adding key [{}] even though it is not in the set of known llap-server keys", key);
+          conf.set(key, (String) props.getValue());
+        } else {
+          LOG.warn("Ignoring unknown llap server parameter: [{}]", key);
+        }
+      }
+    }
+  }
+
+  private int startLlap(Path tmpDir, Path scriptParent) throws IOException, InterruptedException {
+    int rc;
+    String version = System.getenv("HIVE_VERSION");
+    if (StringUtils.isEmpty(version)) {
+      version = DateTime.now().toString("ddMMMyyyy");
+    }
+
+    String outputDir = cl.getOutput();
+    Path packageDir = null;
+    if (outputDir == null) {
+      outputDir = OUTPUT_DIR_PREFIX + version;
+      packageDir = new Path(Paths.get(".").toAbsolutePath().toString(), OUTPUT_DIR_PREFIX + version);
+    } else {
+      packageDir = new Path(outputDir);
+    }
+
+    rc = runPackagePy(tmpDir, scriptParent, version, outputDir);
+    if (rc == 0) {
+      String tarballName = "llap-" + version + ".tar.gz";
+      startCluster(conf, cl.getName(), tarballName, packageDir, conf.getVar(ConfVars.LLAP_DAEMON_QUEUE_NAME));
+    }
+    return rc;
+  }
+
+  private int runPackagePy(Path tmpDir, Path scriptParent, String version, String outputDir)
+      throws IOException, InterruptedException {
+    Path scriptPath = new Path(new Path(scriptParent, "yarn"), "package.py");
+    List<String> scriptArgs = new ArrayList<>(cl.getArgs().length + 7);
+    scriptArgs.addAll(Arrays.asList("python", scriptPath.toString(), "--input", tmpDir.toString(), "--output",
+        outputDir, "--javaChild"));
+    scriptArgs.addAll(Arrays.asList(cl.getArgs()));
+
+    LOG.debug("Calling package.py via: " + scriptArgs);
+    ProcessBuilder builder = new ProcessBuilder(scriptArgs);
+    builder.redirectError(ProcessBuilder.Redirect.INHERIT);
+    builder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
+    builder.environment().put("HIVE_VERSION", version);
+    return builder.start().waitFor();
+  }
+
+  private void startCluster(Configuration conf, String name, String packageName, Path packageDir, String queue) {
+    LOG.info("Starting cluster with " + name + ", " + packageName + ", " + queue + ", " + packageDir);
+    ServiceClient sc;
+    try {
+      sc = LlapSliderUtils.createServiceClient(conf);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    try {
+      try {
+        LOG.info("Executing the stop command");
+        sc.actionStop(name, true);
+      } catch (Exception ex) { // Ignore exceptions from stop
+        LOG.info(ex.getLocalizedMessage());
+      }
+      try {
+        LOG.info("Executing the destroy command");
+        sc.actionDestroy(name);
+      } catch (Exception ex) { // Ignore exceptions from destroy
+        LOG.info(ex.getLocalizedMessage());
+      }
+      LOG.info("Uploading the app tarball");
+      CoreFileSystem fs = new CoreFileSystem(conf);
+      fs.createWithPermissions(new Path(LLAP_PACKAGE_DIR), FsPermission.getDirDefault());
+      fs.copyLocalFileToHdfs(new File(packageDir.toString(), packageName), new Path(LLAP_PACKAGE_DIR),
+          new FsPermission("755"));
+
+      LOG.info("Executing the launch command");
+      File yarnfile = new File(new Path(packageDir, "Yarnfile").toString());
+      Long lifetime = null; // unlimited lifetime
+      sc.actionLaunch(yarnfile.getAbsolutePath(), name, lifetime, queue);
+      LOG.debug("Started the cluster via service API");
+    } catch (YarnException | IOException e) {
+      throw new RuntimeException(e);
+    } finally {
+      try {
+        sc.close();
+      } catch (IOException e) {
+        LOG.info("Failed to close service client", e);
+      }
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    LlapServiceCommandLine cl = new LlapServiceCommandLine(args);
+    int ret = 0;
+    try {
+      ret = new LlapServiceDriver(cl).run();
+    } catch (Throwable t) {
+      System.err.println("Failed: " + t.getMessage());
+      t.printStackTrace();
+      ret = 3;
+    } finally {
+      LOG.info("LLAP service driver finished");
+    }
+    LOG.debug("Completed processing - exiting with " + ret);
+    System.exit(ret);
+  }
+}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapTarComponentGatherer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapTarComponentGatherer.java
new file mode 100644 (file)
index 0000000..a83647b
--- /dev/null
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cli.service;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Gathers all the jar files necessary to start llap.
+ */
+class LlapTarComponentGatherer {
+  private static final Logger LOG = LoggerFactory.getLogger(LlapTarComponentGatherer.class.getName());
+
+  //using Callable<Void> instead of Runnable to be able to throw Exception
+  private final Map<String, Future<Void>> tasks = new HashMap<>();
+
+  private final LlapServiceCommandLine cl;
+  private final HiveConf conf;
+  private final Properties directProperties;
+  private final FileSystem fs;
+  private final FileSystem rawFs;
+  private final ExecutorService executor;
+  private final Path libDir;
+  private final Path tezDir;
+  private final Path udfDir;
+  private final Path confDir;
+
+  LlapTarComponentGatherer(LlapServiceCommandLine cl, HiveConf conf, Properties directProperties, FileSystem fs,
+      FileSystem rawFs, ExecutorService executor, Path tmpDir) {
+    this.cl = cl;
+    this.conf = conf;
+    this.directProperties = directProperties;
+    this.fs = fs;
+    this.rawFs = rawFs;
+    this.executor = executor;
+    this.libDir = new Path(tmpDir, "lib");
+    this.tezDir = new Path(libDir, "tez");
+    this.udfDir = new Path(libDir, "udfs");
+    this.confDir = new Path(tmpDir, "conf");
+  }
+
+  void createDirs() throws Exception {
+    if (!rawFs.mkdirs(tezDir)) {
+      LOG.warn("mkdirs for " + tezDir + " returned false");
+    }
+    if (!rawFs.mkdirs(udfDir)) {
+      LOG.warn("mkdirs for " + udfDir + " returned false");
+    }
+    if (!rawFs.mkdirs(confDir)) {
+      LOG.warn("mkdirs for " + confDir + " returned false");
+    }
+  }
+
+  void submitTarComponentGatherTasks() {
+    CompletionService<Void> asyncRunner = new ExecutorCompletionService<Void>(executor);
+
+    tasks.put("downloadTezJars", asyncRunner.submit(new AsyncTaskDownloadTezJars(conf, fs, rawFs, libDir, tezDir)));
+    tasks.put("copyLocalJars", asyncRunner.submit(new AsyncTaskCopyLocalJars(rawFs, libDir)));
+    tasks.put("copyAuxJars", asyncRunner.submit(new AsyncTaskCopyAuxJars(cl, conf, rawFs, libDir)));
+    tasks.put("createUdfFile", asyncRunner.submit(new AsyncTaskCreateUdfFile(conf, fs, rawFs, udfDir, confDir)));
+    tasks.put("copyConfigs", asyncRunner.submit(new AsyncTaskCopyConfigs(cl, conf, directProperties, rawFs,
+        confDir)));
+  }
+
+  void waitForFinish() throws Exception {
+    for (Map.Entry<String, Future<Void>> task : tasks.entrySet()) {
+      long t1 = System.nanoTime();
+      task.getValue().get();
+      long t2 = System.nanoTime();
+      LOG.debug(task.getKey() + " waited for " + (t2 - t1) + " ns");
+    }
+  }
+}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/package-info.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/package-info.java
new file mode 100644 (file)
index 0000000..46aacf8
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package consisting the program LlapServiceDriver (and other classes used by it) which is starting up the llap daemon.
+ */
+package org.apache.hadoop.hive.llap.cli.service;
+
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cli/service/TestLlapServiceCommandLine.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cli/service/TestLlapServiceCommandLine.java
new file mode 100644 (file)
index 0000000..bb2a99b
--- /dev/null
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cli.service;
+
+import static junit.framework.TestCase.assertEquals;
+
+import java.util.Properties;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.configuration2.ConfigurationConverter;
+import org.apache.commons.configuration2.MapConfiguration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import com.google.common.collect.ImmutableMap;
+
+/** Tests for LlapServiceCommandLine. */
+public class TestLlapServiceCommandLine {
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testArgumentParsingEmpty() throws Exception {
+    thrown.expect(ParseException.class);
+    thrown.expectMessage("instance must be set");
+
+    new LlapServiceCommandLine(new String[] {});
+  }
+
+  @Test
+  public void testArgumentParsingDefault() throws Exception {
+    LlapServiceCommandLine cl = new LlapServiceCommandLine(new String[] {"--instances", "1"});
+    assertEquals(null, cl.getAuxJars());
+    assertEquals(-1, cl.getCache());
+    assertEquals(new Properties(), cl.getConfig());
+    assertEquals(null, cl.getDirectory());
+    assertEquals(-1, cl.getExecutors());
+    assertEquals(-1, cl.getIoThreads());
+    assertEquals(true, cl.getIsHBase());
+    assertEquals(true, cl.getIsHiveAux());
+    assertEquals(null, cl.getJavaPath());
+    assertEquals(null, cl.getLlapQueueName());
+    assertEquals(null, cl.getLogger());
+    assertEquals(null, cl.getName());
+    assertEquals(null, cl.getOutput());
+    assertEquals(-1, cl.getSize());
+    assertEquals(-1, cl.getXmx());
+    assertEquals(false, cl.isStarting());
+  }
+
+  @Test
+  public void testParsingArguments() throws Exception {
+    LlapServiceCommandLine cl = new LlapServiceCommandLine(new String[] {"--instances", "2", "--auxjars", "auxjarsVal",
+        "--cache", "10k", "--hiveconf", "a=b", "--directory", "directoryVal", "--executors", "4", "--iothreads", "5",
+        "--auxhbase", "false", "--auxhive", "false", "--javaHome", "javaHomeVal", "--queue", "queueVal",
+        "--logger", "console", "--name", "nameVal", "--output", "outputVal", "--size", "10m", "--xmx", "10g",
+        "--startImmediately"});
+    assertEquals("auxjarsVal", cl.getAuxJars());
+    assertEquals(10L * 1024, cl.getCache());
+    assertEquals(ConfigurationConverter.getProperties(new MapConfiguration(ImmutableMap.of("a", "b"))), cl.getConfig());
+    assertEquals("directoryVal", cl.getDirectory());
+    assertEquals(4, cl.getExecutors());
+    assertEquals(5, cl.getIoThreads());
+    assertEquals(false, cl.getIsHBase());
+    assertEquals(false, cl.getIsHiveAux());
+    assertEquals("javaHomeVal", cl.getJavaPath());
+    assertEquals("queueVal", cl.getLlapQueueName());
+    assertEquals("console", cl.getLogger());
+    assertEquals("nameVal", cl.getName());
+    assertEquals("outputVal", cl.getOutput());
+    assertEquals(10L * 1024 * 1024, cl.getSize());
+    assertEquals(10L * 1024 * 1024 * 1024, cl.getXmx());
+    assertEquals(true, cl.isStarting());
+  }
+
+  @Test
+  public void testIllegalLogger() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    new LlapServiceCommandLine(new String[] {"--instances", "1", "--logger", "someValue"});
+  }
+
+  @Test
+  public void testIllegalInstances() throws Exception {
+    thrown.expect(NumberFormatException.class);
+    new LlapServiceCommandLine(new String[] {"--instances", "a"});
+  }
+
+  @Test
+  public void testIllegalCache() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    new LlapServiceCommandLine(new String[] {"--instances", "1", "--cache", "a"});
+  }
+
+  @Test
+  public void testIllegalExecutors() throws Exception {
+    thrown.expect(NumberFormatException.class);
+    new LlapServiceCommandLine(new String[] {"--instances", "1", "--executors", "a"});
+  }
+
+  @Test
+  public void testIllegalIoThreads() throws Exception {
+    thrown.expect(NumberFormatException.class);
+    new LlapServiceCommandLine(new String[] {"--instances", "1", "--iothreads", "a"});
+  }
+
+  @Test
+  public void testIllegalSize() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    new LlapServiceCommandLine(new String[] {"--instances", "1", "--size", "a"});
+  }
+
+  @Test
+  public void testIllegalXmx() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    new LlapServiceCommandLine(new String[] {"--instances", "1", "--xmx", "a"});
+  }
+}
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cli/service/package-info.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cli/service/package-info.java
new file mode 100644 (file)
index 0000000..e8746d2
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package consisting the tests for the program LlapServiceDriver and other classes used by it.
+ */
+package org.apache.hadoop.hive.llap.cli.service;
+