YARN-7540 and YARN-7605. Convert yarn app cli to call yarn api services and implement...
authorBillie Rinaldi <billie@apache.org>
Wed, 24 Jan 2018 01:54:39 +0000 (17:54 -0800)
committerBillie Rinaldi <billie@apache.org>
Wed, 24 Jan 2018 01:54:39 +0000 (17:54 -0800)
24 files changed:
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/http/HttpServer2.java
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java [new file with mode: 0644]
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/package-info.java [new file with mode: 0644]
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/package-info.java [new file with mode: 0644]
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestApiServer.java
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestApiServiceClient.java [new file with mode: 0644]
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/example-app.json [new file with mode: 0644]
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/log4j.properties [new file with mode: 0644]
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ReadinessCheck.java
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Resource.java
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/JsonSerDeser.java
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestBuildExternalComponents.java
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/WebApps.java
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryConstants.java
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistrySecurity.java
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

index fa447d8..65aadf3 100644 (file)
@@ -865,6 +865,45 @@ public final class HttpServer2 implements FilterContainer {
   }
 
   /**
+   * Add an internal servlet in the server, with initialization parameters.
+   * Note: This method is to be used for adding servlets that facilitate
+   * internal communication and not for user facing functionality. For
+   * servlets added using this method, filters (except internal Kerberos
+   * filters) are not enabled.
+   *
+   * @param name The name of the servlet (can be passed as null)
+   * @param pathSpec The path spec for the servlet
+   * @param clazz The servlet class
+   * @param params init parameters
+   */
+  public void addInternalServlet(String name, String pathSpec,
+      Class<? extends HttpServlet> clazz, Map<String, String> params) {
+    // Jetty doesn't like the same path spec mapping to different servlets, so
+    // if there's already a mapping for this pathSpec, remove it and assume that
+    // the newest one is the one we want
+    final ServletHolder sh = new ServletHolder(clazz);
+    sh.setName(name);
+    sh.setInitParameters(params);
+    final ServletMapping[] servletMappings =
+        webAppContext.getServletHandler().getServletMappings();
+    for (int i = 0; i < servletMappings.length; i++) {
+      if (servletMappings[i].containsPathSpec(pathSpec)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Found existing " + servletMappings[i].getServletName() +
+              " servlet at path " + pathSpec + "; will replace mapping" +
+              " with " + sh.getName() + " servlet");
+        }
+        ServletMapping[] newServletMappings =
+            ArrayUtil.removeFromArray(servletMappings, servletMappings[i]);
+        webAppContext.getServletHandler()
+            .setServletMappings(newServletMappings);
+        break;
+      }
+    }
+    webAppContext.addServlet(sh, pathSpec);
+  }
+
+  /**
    * Add the given handler to the front of the list of handlers.
    *
    * @param handler The handler to add
index ddea2a1..bae62c6 100644 (file)
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+          <excludes>
+            <exclude>**/*.json</exclude>
+          </excludes>
+        </configuration>
+      </plugin>
     </plugins>
   </build>
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
new file mode 100644 (file)
index 0000000..34e62b6
--- /dev/null
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.service.client;
+
+import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.MessageFormat;
+import java.util.List;
+import java.util.Map;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.api.AppAdminClient;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.api.records.ServiceState;
+import org.apache.hadoop.yarn.service.api.records.ServiceStatus;
+import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
+import org.apache.hadoop.yarn.util.RMHAUtils;
+import org.eclipse.jetty.util.UrlEncoded;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.WebResource.Builder;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+
+import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.*;
+
+/**
+ * The rest API client for users to manage services on YARN.
+ */
+public class ApiServiceClient extends AppAdminClient {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ApiServiceClient.class);
+  protected YarnClient yarnClient;
+
+  @Override protected void serviceInit(Configuration configuration)
+      throws Exception {
+    yarnClient = YarnClient.createYarnClient();
+    addService(yarnClient);
+    super.serviceInit(configuration);
+  }
+
+  /**
+   * Calculate Resource Manager address base on working REST API.
+   */
+  private String getRMWebAddress() {
+    Configuration conf = getConfig();
+    String scheme = "http://";
+    String path = "/app/v1/services/version";
+    String rmAddress = conf
+        .get("yarn.resourcemanager.webapp.address");
+    if (YarnConfiguration.useHttps(conf)) {
+      scheme = "https://";
+      rmAddress = conf
+          .get("yarn.resourcemanager.webapp.https.address");
+    }
+
+    List<String> rmServers = RMHAUtils
+        .getRMHAWebappAddresses(new YarnConfiguration(conf));
+    for (String host : rmServers) {
+      StringBuilder sb = new StringBuilder();
+      sb.append(scheme);
+      sb.append(host);
+      sb.append(path);
+      Client client = Client.create();
+      WebResource webResource = client
+          .resource(sb.toString());
+      String test = webResource.get(String.class);
+      if (test.contains("hadoop_version")) {
+        rmAddress = host;
+        break;
+      }
+    }
+    return scheme+rmAddress;
+  }
+
+  /**
+   * Compute active resource manager API service location.
+   *
+   * @param appName - YARN service name
+   * @return URI to API Service
+   * @throws IOException
+   */
+  private String getApiUrl(String appName) throws IOException {
+    String url = getRMWebAddress();
+    StringBuilder api = new StringBuilder();
+    api.append(url);
+    api.append("/app/v1/services");
+    if (appName != null) {
+      api.append("/");
+      api.append(appName);
+    }
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      api.append("?user.name=" + UrlEncoded
+          .encodeString(System.getProperty("user.name")));
+    }
+    return api.toString();
+  }
+
+  private Builder getApiClient() throws IOException {
+    return getApiClient(null);
+  }
+
+  /**
+   * Setup API service web request.
+   *
+   * @param appName
+   * @return
+   * @throws IOException
+   */
+  private Builder getApiClient(String appName) throws IOException {
+    Client client = Client.create(getClientConfig());
+    Configuration conf = getConfig();
+    client.setChunkedEncodingSize(null);
+    Builder builder = client
+        .resource(getApiUrl(appName)).type(MediaType.APPLICATION_JSON);
+    if (conf.get("hadoop.security.authentication").equals("kerberos")) {
+      AuthenticatedURL.Token token = new AuthenticatedURL.Token();
+      builder.header("WWW-Authenticate", token);
+    }
+    return builder
+        .accept("application/json;charset=utf-8");
+  }
+
+  private ClientConfig getClientConfig() {
+    ClientConfig config = new DefaultClientConfig();
+    config.getProperties().put(
+        ClientConfig.PROPERTY_CHUNKED_ENCODING_SIZE, 0);
+    config.getProperties().put(
+        ClientConfig.PROPERTY_BUFFER_RESPONSE_ENTITY_ON_EXCEPTION, true);
+    return config;
+  }
+
+  private int processResponse(ClientResponse response) {
+    response.bufferEntity();
+    String output;
+    if (response.getStatus() == 401) {
+      LOG.error("Authentication required");
+      return EXIT_EXCEPTION_THROWN;
+    }
+    try {
+      ServiceStatus ss = response.getEntity(ServiceStatus.class);
+      output = ss.getDiagnostics();
+    } catch (Throwable t) {
+      output = response.getEntity(String.class);
+    }
+    if (output==null) {
+      output = response.getEntity(String.class);
+    }
+    if (response.getStatus() <= 299) {
+      LOG.info(output);
+      return EXIT_SUCCESS;
+    } else {
+      LOG.error(output);
+      return EXIT_EXCEPTION_THROWN;
+    }
+  }
+
+  /**
+   * Utility method to load Service json from disk or from
+   * YARN examples.
+   *
+   * @param fileName - path to yarnfile
+   * @param serviceName - YARN Service Name
+   * @param lifetime - application lifetime
+   * @param queue - Queue to submit application
+   * @return
+   * @throws IOException
+   * @throws YarnException
+   */
+  public Service loadAppJsonFromLocalFS(String fileName, String serviceName,
+      Long lifetime, String queue) throws IOException, YarnException {
+    File file = new File(fileName);
+    if (!file.exists() && fileName.equals(file.getName())) {
+      String examplesDirStr = System.getenv("YARN_SERVICE_EXAMPLES_DIR");
+      String[] examplesDirs;
+      if (examplesDirStr == null) {
+        String yarnHome = System
+            .getenv(ApplicationConstants.Environment.HADOOP_YARN_HOME.key());
+        examplesDirs = new String[]{
+            yarnHome + "/share/hadoop/yarn/yarn-service-examples",
+            yarnHome + "/yarn-service-examples"
+        };
+      } else {
+        examplesDirs = StringUtils.split(examplesDirStr, ":");
+      }
+      for (String dir : examplesDirs) {
+        file = new File(MessageFormat.format("{0}/{1}/{2}.json",
+            dir, fileName, fileName));
+        if (file.exists()) {
+          break;
+        }
+        // Then look for secondary location.
+        file = new File(MessageFormat.format("{0}/{1}.json",
+            dir, fileName));
+        if (file.exists()) {
+          break;
+        }
+      }
+    }
+    if (!file.exists()) {
+      throw new YarnException("File or example could not be found: " +
+          fileName);
+    }
+    Path filePath = new Path(file.getAbsolutePath());
+    LOG.info("Loading service definition from local FS: " + filePath);
+    Service service = jsonSerDeser
+        .load(FileSystem.getLocal(getConfig()), filePath);
+    if (!StringUtils.isEmpty(serviceName)) {
+      service.setName(serviceName);
+    }
+    if (lifetime != null && lifetime > 0) {
+      service.setLifetime(lifetime);
+    }
+    if (!StringUtils.isEmpty(queue)) {
+      service.setQueue(queue);
+    }
+    return service;
+  }
+
+  /**
+   * Launch YARN service application.
+   *
+   * @param fileName - path to yarnfile
+   * @param appName - YARN Service Name
+   * @param lifetime - application lifetime
+   * @param queue - Queue to submit application
+   */
+  @Override
+  public int actionLaunch(String fileName, String appName, Long lifetime,
+      String queue) throws IOException, YarnException {
+    int result = EXIT_SUCCESS;
+    try {
+      Service service =
+          loadAppJsonFromLocalFS(fileName, appName, lifetime, queue);
+      String buffer = jsonSerDeser.toJson(service);
+      ClientResponse response = getApiClient()
+          .post(ClientResponse.class, buffer);
+      result = processResponse(response);
+    } catch (Exception e) {
+      LOG.error("Fail to launch application: ", e);
+      result = EXIT_EXCEPTION_THROWN;
+    }
+    return result;
+  }
+
+  /**
+   * Stop YARN service application.
+   *
+   * @param appName - YARN Service Name
+   */
+  @Override
+  public int actionStop(String appName) throws IOException, YarnException {
+    int result = EXIT_SUCCESS;
+    try {
+      Service service = new Service();
+      service.setName(appName);
+      service.setState(ServiceState.STOPPED);
+      String buffer = jsonSerDeser.toJson(service);
+      ClientResponse response = getApiClient(appName)
+          .put(ClientResponse.class, buffer);
+      result = processResponse(response);
+    } catch (Exception e) {
+      LOG.error("Fail to stop application: ", e);
+      result = EXIT_EXCEPTION_THROWN;
+    }
+    return result;
+  }
+
+  /**
+   * Start YARN service application.
+   *
+   * @param appName - YARN Service Name
+   */
+  @Override
+  public int actionStart(String appName) throws IOException, YarnException {
+    int result = EXIT_SUCCESS;
+    try {
+      Service service = new Service();
+      service.setName(appName);
+      service.setState(ServiceState.STARTED);
+      String buffer = jsonSerDeser.toJson(service);
+      ClientResponse response = getApiClient(appName)
+          .put(ClientResponse.class, buffer);
+      result = processResponse(response);
+    } catch (Exception e) {
+      LOG.error("Fail to start application: ", e);
+      result = EXIT_EXCEPTION_THROWN;
+    }
+    return result;
+  }
+
+  /**
+   * Save Service configuration.
+   *
+   * @param fileName - path to Yarnfile
+   * @param appName - YARN Service Name
+   * @param lifetime - container life time
+   * @param queue - Queue to submit the application
+   */
+  @Override
+  public int actionSave(String fileName, String appName, Long lifetime,
+      String queue) throws IOException, YarnException {
+    int result = EXIT_SUCCESS;
+    try {
+      Service service =
+          loadAppJsonFromLocalFS(fileName, appName, lifetime, queue);
+      service.setState(ServiceState.STOPPED);
+      String buffer = jsonSerDeser.toJson(service);
+      ClientResponse response = getApiClient()
+          .post(ClientResponse.class, buffer);
+      result = processResponse(response);
+    } catch (Exception e) {
+      LOG.error("Fail to save application: ", e);
+      result = EXIT_EXCEPTION_THROWN;
+    }
+    return result;
+  }
+
+  /**
+   * Decommission a YARN service.
+   *
+   * @param appName - YARN Service Name
+   */
+  @Override
+  public int actionDestroy(String appName) throws IOException, YarnException {
+    int result = EXIT_SUCCESS;
+    try {
+      ClientResponse response = getApiClient(appName)
+          .delete(ClientResponse.class);
+      result = processResponse(response);
+    } catch (Exception e) {
+      LOG.error("Fail to destroy application: ", e);
+      result = EXIT_EXCEPTION_THROWN;
+    }
+    return result;
+  }
+
+  /**
+   * Change number of containers associated with a service.
+   *
+   * @param appName - YARN Service Name
+   * @param componentCounts - list of components and desired container count
+   */
+  @Override
+  public int actionFlex(String appName, Map<String, String> componentCounts)
+      throws IOException, YarnException {
+    int result = EXIT_SUCCESS;
+    try {
+      Service service = new Service();
+      service.setName(appName);
+      service.setState(ServiceState.FLEX);
+      for (Map.Entry<String, String> entry : componentCounts.entrySet()) {
+        Component component = new Component();
+        component.setName(entry.getKey());
+        Long numberOfContainers = Long.parseLong(entry.getValue());
+        component.setNumberOfContainers(numberOfContainers);
+        service.addComponent(component);
+      }
+      String buffer = jsonSerDeser.toJson(service);
+      ClientResponse response = getApiClient(appName)
+          .put(ClientResponse.class, buffer);
+      result = processResponse(response);
+    } catch (Exception e) {
+      LOG.error("Fail to flex application: ", e);
+      result = EXIT_EXCEPTION_THROWN;
+    }
+    return result;
+  }
+
+  @Override
+  public int enableFastLaunch(String destinationFolder) throws IOException, YarnException {
+    ServiceClient sc = new ServiceClient();
+    sc.init(getConfig());
+    sc.start();
+    int result = sc.enableFastLaunch(destinationFolder);
+    sc.close();
+    return result;
+  }
+
+  /**
+   * Retrieve Service Status through REST API.
+   *
+   * @param appIdOrName - YARN application ID or application name
+   * @return Status output
+   */
+  @Override
+  public String getStatusString(String appIdOrName) throws IOException,
+      YarnException {
+    String output = "";
+    String appName;
+    try {
+      ApplicationId appId = ApplicationId.fromString(appIdOrName);
+      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+      appName = appReport.getName();
+    } catch (IllegalArgumentException e) {
+      // not app Id format, may be app name
+      appName = appIdOrName;
+      ServiceApiUtil.validateNameFormat(appName, getConfig());
+    }
+    try {
+      ClientResponse response = getApiClient(appName).get(ClientResponse.class);
+      if (response.getStatus() != 200) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(appName);
+        sb.append(" Failed : HTTP error code : ");
+        sb.append(response.getStatus());
+        return sb.toString();
+      }
+      output = response.getEntity(String.class);
+    } catch (Exception e) {
+      LOG.error("Fail to check application status: ", e);
+    }
+    return output;
+  }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/package-info.java
new file mode 100644 (file)
index 0000000..cf5ce11
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.service.client contains classes
+ * for YARN Services Client API.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.service.client;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
index 34ab8f0..16f8513 100644 (file)
@@ -19,21 +19,23 @@ package org.apache.hadoop.yarn.service.webapp;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.service.api.records.Component;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.ServiceState;
 import org.apache.hadoop.yarn.service.api.records.ServiceStatus;
 import org.apache.hadoop.yarn.service.client.ServiceClient;
-import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.GET;
@@ -42,15 +44,22 @@ import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
+
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.hadoop.yarn.service.api.records.ServiceState.ACCEPTED;
 import static org.apache.hadoop.yarn.service.conf.RestApiConstants.*;
+import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_SUCCESS;
 
 /**
  * The rest API endpoints for users to manage services on YARN.
@@ -71,7 +80,8 @@ public class ApiServer {
   private static final Logger LOG =
       LoggerFactory.getLogger(ApiServer.class);
   private static Configuration YARN_CONFIG = new YarnConfiguration();
-  private static ServiceClient SERVICE_CLIENT;
+  private ServiceClient serviceClientUnitTest;
+  private boolean unitTest = false;
 
   static {
     init();
@@ -79,9 +89,6 @@ public class ApiServer {
 
   // initialize all the common resources - order is important
   private static void init() {
-    SERVICE_CLIENT = new ServiceClient();
-    SERVICE_CLIENT.init(YARN_CONFIG);
-    SERVICE_CLIENT.start();
   }
 
   @GET
@@ -98,28 +105,62 @@ public class ApiServer {
   @Path(SERVICE_ROOT_PATH)
   @Consumes({ MediaType.APPLICATION_JSON })
   @Produces({ MediaType.APPLICATION_JSON })
-  public Response createService(Service service) {
-    LOG.info("POST: createService = {}", service);
+  public Response createService(@Context HttpServletRequest request,
+      Service service) {
     ServiceStatus serviceStatus = new ServiceStatus();
     try {
-      ApplicationId applicationId = SERVICE_CLIENT.actionCreate(service);
-      LOG.info("Successfully created service " + service.getName()
-          + " applicationId = " + applicationId);
+      UserGroupInformation ugi = getProxyUser(request);
+      LOG.info("POST: createService = {} user = {}", service, ugi);
+      if(service.getState()==ServiceState.STOPPED) {
+        ugi.doAs(new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws YarnException, IOException {
+            ServiceClient sc = getServiceClient();
+            sc.init(YARN_CONFIG);
+            sc.start();
+            sc.actionBuild(service);
+            sc.close();
+            return null;
+          }
+        });
+        serviceStatus.setDiagnostics("Service "+service.getName() +
+            " saved.");
+      } else {
+        ApplicationId applicationId = ugi
+            .doAs(new PrivilegedExceptionAction<ApplicationId>() {
+              @Override
+              public ApplicationId run() throws IOException, YarnException {
+                ServiceClient sc = getServiceClient();
+                sc.init(YARN_CONFIG);
+                sc.start();
+                ApplicationId applicationId = sc.actionCreate(service);
+                sc.close();
+                return applicationId;
+              }
+            });
+        serviceStatus.setDiagnostics("Application ID: " + applicationId);
+      }
       serviceStatus.setState(ACCEPTED);
       serviceStatus.setUri(
           CONTEXT_ROOT + SERVICE_ROOT_PATH + "/" + service
               .getName());
-      return Response.status(Status.ACCEPTED).entity(serviceStatus).build();
-    } catch (IllegalArgumentException e) {
+      return formatResponse(Status.ACCEPTED, serviceStatus);
+    } catch (AccessControlException e) {
       serviceStatus.setDiagnostics(e.getMessage());
-      return Response.status(Status.BAD_REQUEST).entity(serviceStatus)
-          .build();
-    } catch (Exception e) {
-      String message = "Failed to create service " + service.getName();
+      return formatResponse(Status.FORBIDDEN, e.getCause().getMessage());
+    } catch (IllegalArgumentException e) {
+      return formatResponse(Status.BAD_REQUEST, e.getMessage());
+    } catch (IOException | InterruptedException e) {
+      String message = "Failed to create service " + service.getName()
+          + ": {}";
       LOG.error(message, e);
-      serviceStatus.setDiagnostics(message + ": " + e.getMessage());
-      return Response.status(Status.INTERNAL_SERVER_ERROR)
-          .entity(serviceStatus).build();
+      return formatResponse(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+    } catch (UndeclaredThrowableException e) {
+      String message = "Failed to create service " + service.getName()
+          + ": {}";
+      LOG.error(message, e);
+      return formatResponse(Status.INTERNAL_SERVER_ERROR,
+          e.getCause().getMessage());
     }
   }
 
@@ -127,23 +168,42 @@ public class ApiServer {
   @Path(SERVICE_PATH)
   @Consumes({ MediaType.APPLICATION_JSON })
   @Produces({ MediaType.APPLICATION_JSON })
-  public Response getService(@PathParam(SERVICE_NAME) String appName) {
-    LOG.info("GET: getService for appName = {}", appName);
+  public Response getService(@Context HttpServletRequest request,
+      @PathParam(SERVICE_NAME) String appName) {
     ServiceStatus serviceStatus = new ServiceStatus();
     try {
-      Service app = SERVICE_CLIENT.getStatus(appName);
+      if (appName == null) {
+        throw new IllegalArgumentException("Service name can not be null.");
+      }
+      UserGroupInformation ugi = getProxyUser(request);
+      LOG.info("GET: getService for appName = {} user = {}", appName, ugi);
+      Service app = ugi.doAs(new PrivilegedExceptionAction<Service>() {
+        @Override
+        public Service run() throws IOException, YarnException {
+          ServiceClient sc = getServiceClient();
+          sc.init(YARN_CONFIG);
+          sc.start();
+          Service app = sc.getStatus(appName);
+          sc.close();
+          return app;
+        }
+      });
       return Response.ok(app).build();
-    } catch (IllegalArgumentException e) {
+    } catch (AccessControlException e) {
+      return formatResponse(Status.FORBIDDEN, e.getMessage());
+    } catch (IllegalArgumentException |
+        FileNotFoundException e) {
       serviceStatus.setDiagnostics(e.getMessage());
       serviceStatus.setCode(ERROR_CODE_APP_NAME_INVALID);
       return Response.status(Status.NOT_FOUND).entity(serviceStatus)
           .build();
-    } catch (Exception e) {
-      LOG.error("Get service failed", e);
-      serviceStatus
-          .setDiagnostics("Failed to retrieve service: " + e.getMessage());
-      return Response.status(Status.INTERNAL_SERVER_ERROR)
-          .entity(serviceStatus).build();
+    } catch (IOException | InterruptedException e) {
+      LOG.error("Get service failed: {}", e);
+      return formatResponse(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+    } catch (UndeclaredThrowableException e) {
+      LOG.error("Get service failed: {}", e);
+      return formatResponse(Status.INTERNAL_SERVER_ERROR,
+          e.getCause().getMessage());
     }
   }
 
@@ -151,62 +211,111 @@ public class ApiServer {
   @Path(SERVICE_PATH)
   @Consumes({ MediaType.APPLICATION_JSON })
   @Produces({ MediaType.APPLICATION_JSON })
-  public Response deleteService(@PathParam(SERVICE_NAME) String appName) {
-    LOG.info("DELETE: deleteService for appName = {}", appName);
-    return stopService(appName, true);
+  public Response deleteService(@Context HttpServletRequest request,
+      @PathParam(SERVICE_NAME) String appName) {
+    try {
+      if (appName == null) {
+        throw new IllegalArgumentException("Service name can not be null.");
+      }
+      UserGroupInformation ugi = getProxyUser(request);
+      LOG.info("DELETE: deleteService for appName = {} user = {}",
+          appName, ugi);
+      return stopService(appName, true, ugi);
+    } catch (AccessControlException e) {
+      return formatResponse(Status.FORBIDDEN, e.getMessage());
+    } catch (IllegalArgumentException e) {
+      return formatResponse(Status.BAD_REQUEST, e.getMessage());
+    } catch (UndeclaredThrowableException e) {
+      LOG.error("Fail to stop service: {}", e);
+      return formatResponse(Status.BAD_REQUEST,
+          e.getCause().getMessage());
+    } catch (YarnException | FileNotFoundException e) {
+      return formatResponse(Status.NOT_FOUND, e.getMessage());
+    } catch (IOException | InterruptedException e) {
+      LOG.error("Fail to stop service: {}", e);
+      return formatResponse(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+    }
   }
 
-  private Response stopService(String appName, boolean destroy) {
-    try {
-      SERVICE_CLIENT.actionStop(appName, destroy);
-      if (destroy) {
-        SERVICE_CLIENT.actionDestroy(appName);
-        LOG.info("Successfully deleted service {}", appName);
-      } else {
-        LOG.info("Successfully stopped service {}", appName);
+  private Response stopService(String appName, boolean destroy,
+      final UserGroupInformation ugi) throws IOException,
+      InterruptedException, YarnException, FileNotFoundException {
+    ugi.doAs(new PrivilegedExceptionAction<Integer>() {
+      @Override
+      public Integer run() throws IOException, YarnException,
+          FileNotFoundException {
+        int result = 0;
+        ServiceClient sc = getServiceClient();
+        sc.init(YARN_CONFIG);
+        sc.start();
+        result = sc.actionStop(appName, destroy);
+        if (destroy) {
+          result = sc.actionDestroy(appName);
+          LOG.info("Successfully deleted service {}", appName);
+        } else {
+          LOG.info("Successfully stopped service {}", appName);
+        }
+        sc.close();
+        return result;
       }
-      return Response.status(Status.OK).build();
-    } catch (ApplicationNotFoundException e) {
-      ServiceStatus serviceStatus = new ServiceStatus();
-      serviceStatus.setDiagnostics(
-          "Service " + appName + " is not found in YARN: " + e.getMessage());
-      return Response.status(Status.BAD_REQUEST).entity(serviceStatus)
-          .build();
-    } catch (Exception e) {
-      LOG.error("Fail to stop service:", e);
-      ServiceStatus serviceStatus = new ServiceStatus();
-      serviceStatus.setDiagnostics(e.getMessage());
-      return Response.status(Status.INTERNAL_SERVER_ERROR)
-          .entity(serviceStatus).build();
+    });
+    ServiceStatus serviceStatus = new ServiceStatus();
+    if (destroy) {
+      serviceStatus.setDiagnostics("Successfully destroyed service " +
+          appName);
+    } else {
+      serviceStatus.setDiagnostics("Successfully stopped service " +
+          appName);
     }
+    return formatResponse(Status.OK, serviceStatus);
   }
 
   @PUT
   @Path(COMPONENT_PATH)
   @Consumes({ MediaType.APPLICATION_JSON })
   @Produces({ MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN  })
-  public Response updateComponent(@PathParam(SERVICE_NAME) String appName,
+  public Response updateComponent(@Context HttpServletRequest request,
+      @PathParam(SERVICE_NAME) String appName,
       @PathParam(COMPONENT_NAME) String componentName, Component component) {
 
-    if (component.getNumberOfContainers() < 0) {
-      return Response.status(Status.BAD_REQUEST).entity(
-          "Service = " + appName + ", Component = " + component.getName()
-              + ": Invalid number of containers specified " + component
-              .getNumberOfContainers()).build();
-    }
-    ServiceStatus status = new ServiceStatus();
     try {
-      Map<String, Long> original = SERVICE_CLIENT.flexByRestService(appName,
-          Collections.singletonMap(component.getName(),
-              component.getNumberOfContainers()));
+      UserGroupInformation ugi = getProxyUser(request);
+      if (component.getNumberOfContainers() < 0) {
+        String message =
+            "Service = " + appName + ", Component = " + component.getName()
+                + ": Invalid number of containers specified " + component
+                .getNumberOfContainers();
+        throw new YarnException(message);
+      }
+      Map<String, Long> original = ugi
+          .doAs(new PrivilegedExceptionAction<Map<String, Long>>() {
+            @Override
+            public Map<String, Long> run() throws YarnException, IOException {
+              ServiceClient sc = new ServiceClient();
+              sc.init(YARN_CONFIG);
+              sc.start();
+              Map<String, Long> original = sc.flexByRestService(appName,
+                  Collections.singletonMap(component.getName(),
+                      component.getNumberOfContainers()));
+              sc.close();
+              return original;
+            }
+          });
+      ServiceStatus status = new ServiceStatus();
       status.setDiagnostics(
           "Updating component (" + componentName + ") size from " + original
               .get(componentName) + " to " + component.getNumberOfContainers());
-      return Response.ok().entity(status).build();
-    } catch (YarnException | IOException e) {
-      status.setDiagnostics(e.getMessage());
-      return Response.status(Status.INTERNAL_SERVER_ERROR).entity(status)
-          .build();
+      return formatResponse(Status.OK, status);
+    } catch (AccessControlException e) {
+      return formatResponse(Status.FORBIDDEN, e.getMessage());
+    } catch (YarnException e) {
+      return formatResponse(Status.BAD_REQUEST, e.getMessage());
+    } catch (IOException | InterruptedException e) {
+      return formatResponse(Status.INTERNAL_SERVER_ERROR,
+          e.getMessage());
+    } catch (UndeclaredThrowableException e) {
+      return formatResponse(Status.INTERNAL_SERVER_ERROR,
+          e.getCause().getMessage());
     }
   }
 
@@ -214,75 +323,138 @@ public class ApiServer {
   @Path(SERVICE_PATH)
   @Consumes({ MediaType.APPLICATION_JSON })
   @Produces({ MediaType.APPLICATION_JSON })
-  public Response updateService(@PathParam(SERVICE_NAME) String appName,
+  public Response updateService(@Context HttpServletRequest request,
+      @PathParam(SERVICE_NAME) String appName,
       Service updateServiceData) {
-    LOG.info("PUT: updateService for app = {} with data = {}", appName,
-        updateServiceData);
-
-    // Ignore the app name provided in updateServiceData and always use appName
-    // path param
-    updateServiceData.setName(appName);
-
-    // For STOP the app should be running. If already stopped then this
-    // operation will be a no-op. For START it should be in stopped state.
-    // If already running then this operation will be a no-op.
-    if (updateServiceData.getState() != null
-        && updateServiceData.getState() == ServiceState.STOPPED) {
-      return stopService(appName, false);
-    }
+    try {
+      UserGroupInformation ugi = getProxyUser(request);
+      LOG.info("PUT: updateService for app = {} with data = {} user = {}",
+          appName, updateServiceData, ugi);
+      // Ignore the app name provided in updateServiceData and always use
+      // appName path param
+      updateServiceData.setName(appName);
 
-    // If a START is requested
-    if (updateServiceData.getState() != null
-        && updateServiceData.getState() == ServiceState.STARTED) {
-      return startService(appName);
-    }
+      if (updateServiceData.getState() != null
+          && updateServiceData.getState() == ServiceState.FLEX) {
+        return flexService(updateServiceData, ugi);
+      }
+      // For STOP the app should be running. If already stopped then this
+      // operation will be a no-op. For START it should be in stopped state.
+      // If already running then this operation will be a no-op.
+      if (updateServiceData.getState() != null
+          && updateServiceData.getState() == ServiceState.STOPPED) {
+        return stopService(appName, false, ugi);
+      }
+
+      // If a START is requested
+      if (updateServiceData.getState() != null
+          && updateServiceData.getState() == ServiceState.STARTED) {
+        return startService(appName, ugi);
+      }
 
-    // If new lifetime value specified then update it
-    if (updateServiceData.getLifetime() != null
-        && updateServiceData.getLifetime() > 0) {
-      return updateLifetime(appName, updateServiceData);
+      // If new lifetime value specified then update it
+      if (updateServiceData.getLifetime() != null
+          && updateServiceData.getLifetime() > 0) {
+        return updateLifetime(appName, updateServiceData, ugi);
+      }
+    } catch (UndeclaredThrowableException e) {
+      return formatResponse(Status.BAD_REQUEST,
+          e.getCause().getMessage());
+    } catch (AccessControlException e) {
+      return formatResponse(Status.FORBIDDEN, e.getMessage());
+    } catch (FileNotFoundException e) {
+      String message = "Application is not found app: " + appName;
+      LOG.error(message, e);
+      return formatResponse(Status.NOT_FOUND, e.getMessage());
+    } catch (YarnException e) {
+      String message = "Service is not found in hdfs: " + appName;
+      LOG.error(message, e);
+      return formatResponse(Status.NOT_FOUND, e.getMessage());
+    } catch (IOException | InterruptedException e) {
+      String message = "Error while performing operation for app: " + appName;
+      LOG.error(message, e);
+      return formatResponse(Status.INTERNAL_SERVER_ERROR, e.getMessage());
     }
 
     // If nothing happens consider it a no-op
     return Response.status(Status.NO_CONTENT).build();
   }
 
-  private Response updateLifetime(String appName, Service updateAppData) {
-    ServiceStatus status = new ServiceStatus();
-    try {
-      String newLifeTime =
-          SERVICE_CLIENT.updateLifetime(appName, updateAppData.getLifetime());
-      status.setDiagnostics(
-          "Service (" + appName + ")'s lifeTime is updated to " + newLifeTime
-              + ", " + updateAppData.getLifetime()
-              + " seconds remaining");
-      return Response.ok(status).build();
-    } catch (Exception e) {
-      String message =
-          "Failed to update service (" + appName + ")'s lifetime to "
-              + updateAppData.getLifetime();
-      LOG.error(message, e);
-      status.setDiagnostics(message + ": " + e.getMessage());
-      return Response.status(Status.INTERNAL_SERVER_ERROR).entity(status)
-          .build();
+  private Response flexService(Service service, UserGroupInformation ugi)
+      throws IOException, InterruptedException {
+    String appName = service.getName();
+    Response response = Response.status(Status.BAD_REQUEST).build();
+    Map<String, String> componentCountStrings = new HashMap<String, String>();
+    for (Component c : service.getComponents()) {
+      componentCountStrings.put(c.getName(),
+          c.getNumberOfContainers().toString());
     }
-  }
+    Integer result = ugi.doAs(new PrivilegedExceptionAction<Integer>() {
 
-  private Response startService(String appName) {
-    ServiceStatus status = new ServiceStatus();
-    try {
-      SERVICE_CLIENT.actionStart(appName);
-      LOG.info("Successfully started service " + appName);
-      status.setDiagnostics("Service " + appName + " is successfully started.");
+      @Override
+      public Integer run() throws YarnException, IOException {
+        int result = 0;
+        ServiceClient sc = new ServiceClient();
+        sc.init(YARN_CONFIG);
+        sc.start();
+        result = sc
+            .actionFlex(appName, componentCountStrings);
+        sc.close();
+        return Integer.valueOf(result);
+      }
+    });
+    if (result == EXIT_SUCCESS) {
+      String message = "Service " + appName + " is successfully flexed.";
+      LOG.info(message);
+      ServiceStatus status = new ServiceStatus();
+      status.setDiagnostics(message);
       status.setState(ServiceState.ACCEPTED);
-      return Response.ok(status).build();
-    } catch (Exception e) {
-      String message = "Failed to start service " + appName;
-      status.setDiagnostics(message + ": " +  e.getMessage());
-      LOG.info(message, e);
-      return Response.status(Status.INTERNAL_SERVER_ERROR)
-          .entity(status).build();
+      response = formatResponse(Status.ACCEPTED, status);
     }
+    return response;
+  }
+
+  private Response updateLifetime(String appName, Service updateAppData,
+      final UserGroupInformation ugi) throws IOException,
+      InterruptedException {
+    String newLifeTime = ugi.doAs(new PrivilegedExceptionAction<String>() {
+      @Override
+      public String run() throws YarnException, IOException {
+        ServiceClient sc = getServiceClient();
+        sc.init(YARN_CONFIG);
+        sc.start();
+        String newLifeTime = sc.updateLifetime(appName,
+            updateAppData.getLifetime());
+        sc.close();
+        return newLifeTime;
+      }
+    });
+    ServiceStatus status = new ServiceStatus();
+    status.setDiagnostics(
+        "Service (" + appName + ")'s lifeTime is updated to " + newLifeTime
+            + ", " + updateAppData.getLifetime() + " seconds remaining");
+    return formatResponse(Status.OK, status);
+  }
+
+  private Response startService(String appName,
+      final UserGroupInformation ugi) throws IOException,
+      InterruptedException {
+    ugi.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws YarnException, IOException {
+        ServiceClient sc = getServiceClient();
+        sc.init(YARN_CONFIG);
+        sc.start();
+        sc.actionStart(appName);
+        sc.close();
+        return null;
+      }
+    });
+    LOG.info("Successfully started service " + appName);
+    ServiceStatus status = new ServiceStatus();
+    status.setDiagnostics("Service " + appName + " is successfully started.");
+    status.setState(ServiceState.ACCEPTED);
+    return formatResponse(Status.OK, status);
   }
 
   /**
@@ -290,10 +462,65 @@ public class ApiServer {
    *
    * @param mockServerClient - A mocked version of ServiceClient
    */
-  public static void setServiceClient(ServiceClient mockServerClient) {
-    SERVICE_CLIENT = mockServerClient;
-    SERVICE_CLIENT.init(YARN_CONFIG);
-    SERVICE_CLIENT.start();
+  public void setServiceClient(ServiceClient mockServerClient) {
+    serviceClientUnitTest = mockServerClient;
+    unitTest = true;
+  }
+
+  private ServiceClient getServiceClient() {
+    if (unitTest) {
+      return serviceClientUnitTest;
+    } else {
+      return new ServiceClient();
+    }
+  }
+
+  /**
+   * Configure impersonation callback.
+   *
+   * @param request - web request
+   * @return - configured UGI class for proxy callback
+   * @throws IOException - if user is not login.
+   */
+  private UserGroupInformation getProxyUser(HttpServletRequest request)
+      throws AccessControlException {
+    UserGroupInformation proxyUser;
+    UserGroupInformation ugi;
+    String remoteUser = request.getRemoteUser();
+    try {
+      if (UserGroupInformation.isSecurityEnabled()) {
+        proxyUser = UserGroupInformation.getLoginUser();
+        ugi = UserGroupInformation.createProxyUser(remoteUser, proxyUser);
+      } else {
+        ugi = UserGroupInformation.createRemoteUser(remoteUser);
+      }
+      return ugi;
+    } catch (IOException e) {
+      throw new AccessControlException(e.getCause());
+    }
+  }
+
+  /**
+   * Format HTTP response.
+   *
+   * @param status - HTTP Code
+   * @param message - Diagnostic message
+   * @return - HTTP response
+   */
+  private Response formatResponse(Status status, String message) {
+    ServiceStatus entity = new ServiceStatus();
+    entity.setDiagnostics(message);
+    return formatResponse(status, entity);
   }
 
+  /**
+   * Format HTTP response.
+   *
+   * @param status - HTTP Code
+   * @param entity - ServiceStatus object
+   * @return - HTTP response
+   */
+  private Response formatResponse(Status status, ServiceStatus entity) {
+    return Response.status(status).entity(entity).build();
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/package-info.java
new file mode 100644 (file)
index 0000000..1bdf05a
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.service.webapp contains classes to be used
+ * for YARN Services API.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.service.webapp;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
index 896b2f6..52057db 100644 (file)
@@ -26,12 +26,15 @@ import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.ServiceState;
 import org.apache.hadoop.yarn.service.client.ServiceClient;
 import org.apache.hadoop.yarn.service.webapp.ApiServer;
+
+import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Path;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -44,15 +47,19 @@ import static org.junit.Assert.*;
  */
 public class TestApiServer {
   private ApiServer apiServer;
+  private HttpServletRequest request;
 
   @Before
   public void setup() throws Exception {
+    request = Mockito.mock(HttpServletRequest.class);
+    Mockito.when(request.getRemoteUser())
+        .thenReturn(System.getProperty("user.name"));
     ServiceClient mockServerClient = new ServiceClientTest();
     Configuration conf = new Configuration();
     conf.set("yarn.api-service.service.client.class",
         ServiceClientTest.class.getName());
-    ApiServer.setServiceClient(mockServerClient);
-    this.apiServer = new ApiServer(conf);
+    apiServer = new ApiServer(conf);
+    apiServer.setServiceClient(mockServerClient);
   }
 
   @Test
@@ -77,7 +84,7 @@ public class TestApiServer {
   public void testBadCreateService() {
     Service service = new Service();
     // Test for invalid argument
-    final Response actual = apiServer.createService(service);
+    final Response actual = apiServer.createService(request, service);
     assertEquals("Create service is ", actual.getStatus(),
         Response.status(Status.BAD_REQUEST).build().getStatus());
   }
@@ -101,51 +108,51 @@ public class TestApiServer {
     c.setResource(resource);
     components.add(c);
     service.setComponents(components);
-    final Response actual = apiServer.createService(service);
+    final Response actual = apiServer.createService(request, service);
     assertEquals("Create service is ", actual.getStatus(),
         Response.status(Status.ACCEPTED).build().getStatus());
   }
 
   @Test
   public void testBadGetService() {
-    final Response actual = apiServer.getService("no-jenkins");
+    final Response actual = apiServer.getService(request, "no-jenkins");
     assertEquals("Get service is ", actual.getStatus(),
         Response.status(Status.NOT_FOUND).build().getStatus());
   }
 
   @Test
   public void testBadGetService2() {
-    final Response actual = apiServer.getService(null);
+    final Response actual = apiServer.getService(request, null);
     assertEquals("Get service is ", actual.getStatus(),
-        Response.status(Status.INTERNAL_SERVER_ERROR)
+        Response.status(Status.NOT_FOUND)
             .build().getStatus());
   }
 
   @Test
   public void testGoodGetService() {
-    final Response actual = apiServer.getService("jenkins");
+    final Response actual = apiServer.getService(request, "jenkins");
     assertEquals("Get service is ", actual.getStatus(),
         Response.status(Status.OK).build().getStatus());
   }
 
   @Test
   public void testBadDeleteService() {
-    final Response actual = apiServer.deleteService("no-jenkins");
+    final Response actual = apiServer.deleteService(request, "no-jenkins");
     assertEquals("Delete service is ", actual.getStatus(),
         Response.status(Status.BAD_REQUEST).build().getStatus());
   }
 
   @Test
   public void testBadDeleteService2() {
-    final Response actual = apiServer.deleteService(null);
+    final Response actual = apiServer.deleteService(request, null);
     assertEquals("Delete service is ", actual.getStatus(),
-        Response.status(Status.INTERNAL_SERVER_ERROR)
+        Response.status(Status.BAD_REQUEST)
             .build().getStatus());
   }
 
   @Test
   public void testGoodDeleteService() {
-    final Response actual = apiServer.deleteService("jenkins");
+    final Response actual = apiServer.deleteService(request, "jenkins");
     assertEquals("Delete service is ", actual.getStatus(),
         Response.status(Status.OK).build().getStatus());
   }
@@ -170,7 +177,7 @@ public class TestApiServer {
     c.setResource(resource);
     components.add(c);
     service.setComponents(components);
-    final Response actual = apiServer.updateService("jenkins",
+    final Response actual = apiServer.updateService(request, "jenkins",
         service);
     assertEquals("update service is ", actual.getStatus(),
         Response.status(Status.OK).build().getStatus());
@@ -197,7 +204,7 @@ public class TestApiServer {
     components.add(c);
     service.setComponents(components);
     System.out.println("before stop");
-    final Response actual = apiServer.updateService("no-jenkins",
+    final Response actual = apiServer.updateService(request, "no-jenkins",
         service);
     assertEquals("flex service is ", actual.getStatus(),
         Response.status(Status.BAD_REQUEST).build().getStatus());
@@ -223,7 +230,7 @@ public class TestApiServer {
     c.setResource(resource);
     components.add(c);
     service.setComponents(components);
-    final Response actual = apiServer.updateService("jenkins",
+    final Response actual = apiServer.updateService(request, "jenkins",
         service);
     assertEquals("flex service is ", actual.getStatus(),
         Response.status(Status.OK).build().getStatus());
@@ -249,10 +256,10 @@ public class TestApiServer {
     c.setResource(resource);
     components.add(c);
     service.setComponents(components);
-    final Response actual = apiServer.updateService("no-jenkins",
+    final Response actual = apiServer.updateService(request, "no-jenkins",
         service);
     assertEquals("start service is ", actual.getStatus(),
-        Response.status(Status.INTERNAL_SERVER_ERROR).build()
+        Response.status(Status.BAD_REQUEST).build()
             .getStatus());
   }
 
@@ -276,7 +283,7 @@ public class TestApiServer {
     c.setResource(resource);
     components.add(c);
     service.setComponents(components);
-    final Response actual = apiServer.updateService("jenkins",
+    final Response actual = apiServer.updateService(request, "jenkins",
         service);
     assertEquals("start service is ", actual.getStatus(),
         Response.status(Status.OK).build().getStatus());
@@ -303,7 +310,7 @@ public class TestApiServer {
     components.add(c);
     service.setComponents(components);
     System.out.println("before stop");
-    final Response actual = apiServer.updateService("no-jenkins",
+    final Response actual = apiServer.updateService(request, "no-jenkins",
         service);
     assertEquals("stop service is ", actual.getStatus(),
         Response.status(Status.BAD_REQUEST).build().getStatus());
@@ -330,7 +337,7 @@ public class TestApiServer {
     components.add(c);
     service.setComponents(components);
     System.out.println("before stop");
-    final Response actual = apiServer.updateService("jenkins",
+    final Response actual = apiServer.updateService(request, "jenkins",
         service);
     assertEquals("stop service is ", actual.getStatus(),
         Response.status(Status.OK).build().getStatus());
@@ -357,10 +364,10 @@ public class TestApiServer {
     components.add(c);
     service.setComponents(components);
     System.out.println("before stop");
-    final Response actual = apiServer.updateService("no-jenkins",
+    final Response actual = apiServer.updateService(request, "no-jenkins",
         service);
     assertEquals("update service is ", actual.getStatus(),
-        Response.status(Status.INTERNAL_SERVER_ERROR)
+        Response.status(Status.BAD_REQUEST)
             .build().getStatus());
   }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestApiServiceClient.java
new file mode 100644 (file)
index 0000000..ffd9328
--- /dev/null
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.service.client;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.*;
+
+/**
+ * Test case for CLI to API Service.
+ *
+ */
+public class TestApiServiceClient {
+  private static ApiServiceClient asc;
+  private static ApiServiceClient badAsc;
+  private static Server server;
+
+  /**
+   * A mocked version of API Service for testing purpose.
+   *
+   */
+  @SuppressWarnings("serial")
+  public static class TestServlet extends HttpServlet {
+
+    @Override
+    protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+        throws ServletException, IOException {
+      System.out.println("Get was called");
+      resp.setStatus(HttpServletResponse.SC_OK);
+    }
+
+    @Override
+    protected void doPost(HttpServletRequest req, HttpServletResponse resp)
+        throws ServletException, IOException {
+      resp.setStatus(HttpServletResponse.SC_OK);
+    }
+
+    @Override
+    protected void doPut(HttpServletRequest req, HttpServletResponse resp)
+        throws ServletException, IOException {
+      resp.setStatus(HttpServletResponse.SC_OK);
+    }
+
+    @Override
+    protected void doDelete(HttpServletRequest req, HttpServletResponse resp)
+        throws ServletException, IOException {
+      resp.setStatus(HttpServletResponse.SC_OK);
+    }
+
+  }
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    server = new Server(8088);
+    ((QueuedThreadPool)server.getThreadPool()).setMaxThreads(10);
+    ServletContextHandler context = new ServletContextHandler();
+    context.setContextPath("/app");
+    server.setHandler(context);
+    context.addServlet(new ServletHolder(TestServlet.class), "/*");
+    ((ServerConnector)server.getConnectors()[0]).setHost("localhost");
+    server.start();
+
+    Configuration conf = new Configuration();
+    conf.set("yarn.resourcemanager.webapp.address",
+        "localhost:8088");
+    asc = new ApiServiceClient();
+    asc.serviceInit(conf);
+
+    Configuration conf2 = new Configuration();
+    conf2.set("yarn.resourcemanager.webapp.address",
+        "localhost:8089");
+    badAsc = new ApiServiceClient();
+    badAsc.serviceInit(conf2);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    server.stop();
+  }
+
+  @Test
+  public void testLaunch() {
+    String fileName = "target/test-classes/example-app.json";
+    String appName = "example-app";
+    long lifetime = 3600L;
+    String queue = "default";
+    try {
+      int result = asc.actionLaunch(fileName, appName, lifetime, queue);
+      assertEquals(EXIT_SUCCESS, result);
+    } catch (IOException | YarnException e) {
+      fail();
+    }
+  }
+
+  @Test
+  public void testBadLaunch() {
+    String fileName = "unknown_file";
+    String appName = "unknown_app";
+    long lifetime = 3600L;
+    String queue = "default";
+    try {
+      int result = badAsc.actionLaunch(fileName, appName, lifetime, queue);
+      assertEquals(EXIT_EXCEPTION_THROWN, result);
+    } catch (IOException | YarnException e) {
+      fail();
+    }
+  }
+
+  @Test
+  public void testStop() {
+    String appName = "example-app";
+    try {
+      int result = asc.actionStop(appName);
+      assertEquals(EXIT_SUCCESS, result);
+    } catch (IOException | YarnException e) {
+      fail();
+    }
+  }
+
+  @Test
+  public void testBadStop() {
+    String appName = "unknown_app";
+    try {
+      int result = badAsc.actionStop(appName);
+      assertEquals(EXIT_EXCEPTION_THROWN, result);
+    } catch (IOException | YarnException e) {
+      fail();
+    }
+  }
+
+  @Test
+  public void testStart() {
+    String appName = "example-app";
+    try {
+      int result = asc.actionStart(appName);
+      assertEquals(EXIT_SUCCESS, result);
+    } catch (IOException | YarnException e) {
+      fail();
+    }
+  }
+
+  @Test
+  public void testBadStart() {
+    String appName = "unknown_app";
+    try {
+      int result = badAsc.actionStart(appName);
+      assertEquals(EXIT_EXCEPTION_THROWN, result);
+    } catch (IOException | YarnException e) {
+      fail();
+    }
+  }
+
+  @Test
+  public void testSave() {
+    String fileName = "target/test-classes/example-app.json";
+    String appName = "example-app";
+    long lifetime = 3600L;
+    String queue = "default";
+    try {
+      int result = asc.actionSave(fileName, appName, lifetime, queue);
+      assertEquals(EXIT_SUCCESS, result);
+    } catch (IOException | YarnException e) {
+      fail();
+    }
+  }
+
+  @Test
+  public void testBadSave() {
+    String fileName = "unknown_file";
+    String appName = "unknown_app";
+    long lifetime = 3600L;
+    String queue = "default";
+    try {
+      int result = badAsc.actionSave(fileName, appName, lifetime, queue);
+      assertEquals(EXIT_EXCEPTION_THROWN, result);
+    } catch (IOException | YarnException e) {
+      fail();
+    }
+  }
+
+  @Test
+  public void testFlex() {
+    String appName = "example-app";
+    HashMap<String, String> componentCounts = new HashMap<String, String>();
+    try {
+      int result = asc.actionFlex(appName, componentCounts);
+      assertEquals(EXIT_SUCCESS, result);
+    } catch (IOException | YarnException e) {
+      fail();
+    }
+  }
+
+  @Test
+  public void testBadFlex() {
+    String appName = "unknown_app";
+    HashMap<String, String> componentCounts = new HashMap<String, String>();
+    try {
+      int result = badAsc.actionFlex(appName, componentCounts);
+      assertEquals(EXIT_EXCEPTION_THROWN, result);
+    } catch (IOException | YarnException e) {
+      fail();
+    }
+  }
+
+  @Test
+  public void testDestroy() {
+    String appName = "example-app";
+    try {
+      int result = asc.actionDestroy(appName);
+      assertEquals(EXIT_SUCCESS, result);
+    } catch (IOException | YarnException e) {
+      fail();
+    }
+  }
+
+  @Test
+  public void testBadDestroy() {
+    String appName = "unknown_app";
+    try {
+      int result = badAsc.actionDestroy(appName);
+      assertEquals(EXIT_EXCEPTION_THROWN, result);
+    } catch (IOException | YarnException e) {
+      fail();
+    }
+  }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/example-app.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/example-app.json
new file mode 100644 (file)
index 0000000..5dfbd64
--- /dev/null
@@ -0,0 +1,15 @@
+{
+  "name": "example-app",
+  "components" :
+  [
+    {
+      "name": "simple",
+      "number_of_containers": 1,
+      "launch_command": "sleep 2",
+      "resource": {
+        "cpus": 1,
+        "memory": "128"
+      }
+    }
+  ]
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/log4j.properties b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/log4j.properties
new file mode 100644 (file)
index 0000000..81a3f6a
--- /dev/null
@@ -0,0 +1,19 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n
index 2bcf68b..af7c542 100644 (file)
@@ -32,6 +32,7 @@ import javax.xml.bind.annotation.XmlEnum;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlType;
 
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonValue;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -49,6 +50,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 @javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2016-06-02T08:15:05.615-07:00")
 @XmlRootElement
 @XmlAccessorType(XmlAccessType.FIELD)
+@JsonInclude(JsonInclude.Include.NON_NULL)
 public class ReadinessCheck implements Serializable {
   private static final long serialVersionUID = -3836839816887186801L;
 
index b8034b3..ee6e681 100644 (file)
@@ -987,7 +987,9 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
     GetStatusResponseProto response =
         amProxy.getStatus(GetStatusRequestProto.newBuilder().build());
     appSpec = jsonSerDeser.fromJson(response.getStatus());
-
+    if (lifetime != null) {
+      appSpec.setLifetime(lifetime.getRemainingTime());
+    }
     return appSpec;
   }
 
index 7b22e3e..2c27ea7 100644 (file)
@@ -60,10 +60,13 @@ public class JsonSerDeser<T> {
    * Create an instance bound to a specific type
    * @param classType class type
    */
+  @SuppressWarnings("deprecation")
   public JsonSerDeser(Class<T> classType) {
     this.classType = classType;
     this.mapper = new ObjectMapper();
     mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    mapper.configure(SerializationConfig.Feature.WRITE_NULL_MAP_VALUES, false);
+    mapper.configure(SerializationConfig.Feature.WRITE_NULL_PROPERTIES, false);
   }
 
   public JsonSerDeser(Class<T> classType, PropertyNamingStrategy namingStrategy) {
index 78670e2..5067ffc 100644 (file)
@@ -275,10 +275,6 @@ public class TestYarnNativeServices extends ServiceTestUtils {
     }
   }
 
-  private void checkRegistryAndCompDirDeleted() {
-
-  }
-
   private void checkEachCompInstancesInOrder(Component component) {
     long expectedNumInstances = component.getNumberOfContainers();
     Assert.assertEquals(expectedNumInstances, component.getContainers().size());
@@ -294,32 +290,6 @@ public class TestYarnNativeServices extends ServiceTestUtils {
     }
   }
 
-  private void waitForOneCompToBeReady(ServiceClient client,
-      Service exampleApp, String readyComp)
-      throws TimeoutException, InterruptedException {
-    long numExpectedContainers =
-        exampleApp.getComponent(readyComp).getNumberOfContainers();
-    GenericTestUtils.waitFor(() -> {
-      try {
-        Service retrievedApp = client.getStatus(exampleApp.getName());
-        Component retrievedComp = retrievedApp.getComponent(readyComp);
-
-        if (retrievedComp.getContainers() != null
-            && retrievedComp.getContainers().size() == numExpectedContainers) {
-          LOG.info(readyComp + " found " + numExpectedContainers
-              + " containers running");
-          return true;
-        } else {
-          LOG.info(" Waiting for " + readyComp + "'s containers to be running");
-          return false;
-        }
-      } catch (Exception e) {
-        e.printStackTrace();
-        return false;
-      }
-    }, 2000, 200000);
-  }
-
   /**
    * Wait until all the containers for all components become ready state.
    *
index 1f4581e..6d5bb20 100644 (file)
@@ -60,7 +60,7 @@ public class TestBuildExternalComponents {
   private void buildAndCheckComponents(String appName, String appDef,
       SliderFileSystem sfs, Set<String> names) throws Throwable {
     AppAdminClient client = AppAdminClient.createAppAdminClient(AppAdminClient
-        .DEFAULT_TYPE, conf);
+        .UNIT_TEST_TYPE, conf);
     client.actionSave(ExampleAppJson.resourceName(appDef), null, null,
         null);
 
index df4b1df..a95818f 100644 (file)
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.service.client;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.client.api.AppAdminClient;
 import org.apache.hadoop.yarn.client.cli.ApplicationCLI;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.service.api.records.Component;
@@ -61,15 +62,20 @@ public class TestServiceCLI {
   }
 
   private void buildApp(String serviceName, String appDef) throws Throwable {
-    String[] args = {"app", "-D", basedirProp, "-save", serviceName,
-        ExampleAppJson.resourceName(appDef)};
+    String[] args = {"app",
+        "-D", basedirProp, "-save", serviceName,
+        ExampleAppJson.resourceName(appDef),
+        "-appTypes", AppAdminClient.UNIT_TEST_TYPE};
     runCLI(args);
   }
 
   private void buildApp(String serviceName, String appDef, String lifetime,
       String queue) throws Throwable {
-    String[] args = {"app", "-D", basedirProp, "-save", serviceName,
-        ExampleAppJson.resourceName(appDef), "-updateLifetime", lifetime,
+    String[] args = {"app",
+        "-D", basedirProp, "-save", serviceName,
+        ExampleAppJson.resourceName(appDef),
+        "-appTypes", AppAdminClient.UNIT_TEST_TYPE,
+        "-updateLifetime", lifetime,
         "-changeQueue", queue};
     runCLI(args);
   }
index 55be13b..a09663e 100644 (file)
@@ -39,6 +39,9 @@ public abstract class AppAdminClient extends CompositeService {
       ".application.admin.client.class.";
   public static final String DEFAULT_TYPE = "yarn-service";
   public static final String DEFAULT_CLASS_NAME = "org.apache.hadoop.yarn" +
+      ".service.client.ApiServiceClient";
+  public static final String UNIT_TEST_TYPE = "unit-test";
+  public static final String UNIT_TEST_CLASS_NAME = "org.apache.hadoop.yarn" +
       ".service.client.ServiceClient";
 
   @Private
@@ -64,6 +67,9 @@ public abstract class AppAdminClient extends CompositeService {
     if (!clientClassMap.containsKey(DEFAULT_TYPE)) {
       clientClassMap.put(DEFAULT_TYPE, DEFAULT_CLASS_NAME);
     }
+    if (!clientClassMap.containsKey(UNIT_TEST_TYPE)) {
+      clientClassMap.put(UNIT_TEST_TYPE, UNIT_TEST_CLASS_NAME);
+    }
     if (!clientClassMap.containsKey(appType)) {
       throw new IllegalArgumentException("App admin client class name not " +
           "specified for type " + appType);
index 471b4d6..daca296 100644 (file)
@@ -322,6 +322,9 @@ public class ApplicationCLI extends YarnCLI {
             System.err.println("Application with name '" + appIdOrName
                 + "' doesn't exist in RM or Timeline Server.");
             return -1;
+          } catch (Exception ie) {
+            System.err.println(ie.getMessage());
+            return -1;
           }
         }
       } else if (title.equalsIgnoreCase(APPLICATION_ATTEMPT)) {
index d3ad53e..7364445 100644 (file)
@@ -82,6 +82,7 @@ public class WebApps {
       public Class<? extends HttpServlet> clazz;
       public String name;
       public String spec;
+      public Map<String, String> params;
     }
     
     final String name;
@@ -147,7 +148,19 @@ public class WebApps {
       servlets.add(struct);
       return this;
     }
-    
+
+    public Builder<T> withServlet(String name, String pathSpec,
+        Class<? extends HttpServlet> servlet,
+        Map<String, String> params) {
+      ServletStruct struct = new ServletStruct();
+      struct.clazz = servlet;
+      struct.name = name;
+      struct.spec = pathSpec;
+      struct.params = params;
+      servlets.add(struct);
+      return this;
+    }
+
     public Builder<T> with(Configuration conf) {
       this.conf = conf;
       return this;
@@ -243,6 +256,11 @@ public class WebApps {
           pathList.add("/" + wsName + "/*");
         }
       }
+      for (ServletStruct s : servlets) {
+        if (!pathList.contains(s.spec)) {
+          pathList.add(s.spec);
+        }
+      }
       if (conf == null) {
         conf = new Configuration();
       }
@@ -315,7 +333,12 @@ public class WebApps {
         HttpServer2 server = builder.build();
 
         for(ServletStruct struct: servlets) {
-          server.addServlet(struct.name, struct.spec, struct.clazz);
+          if (struct.params != null) {
+            server.addInternalServlet(struct.name, struct.spec,
+                struct.clazz, struct.params);
+          } else {
+            server.addServlet(struct.name, struct.spec, struct.clazz);
+          }
         }
         for(Map.Entry<String, Object> entry : attributes.entrySet()) {
           server.setAttribute(entry.getKey(), entry.getValue());
@@ -394,22 +417,16 @@ public class WebApps {
     }
 
     public WebApp start(WebApp webapp) {
-      return start(webapp, null, null);
+      return start(webapp, null);
     }
 
-    public WebApp start(WebApp webapp, WebAppContext ui2Context,
-        Map<String, String> services) {
+    public WebApp start(WebApp webapp, WebAppContext ui2Context) {
       WebApp webApp = build(webapp);
       HttpServer2 httpServer = webApp.httpServer();
       if (ui2Context != null) {
         addFiltersForNewContext(ui2Context);
         httpServer.addHandlerAtFront(ui2Context);
       }
-      if (services!=null) {
-        String packageName = services.get("PackageName");
-        String pathSpec = services.get("PathSpec");
-        httpServer.addJerseyResourcePackage(packageName, pathSpec);
-      }
       try {
         httpServer.start();
         LOG.info("Web app " + name + " started at "
index ff6e8aa..521d8a9 100644 (file)
@@ -99,7 +99,7 @@ public class RegistrySecurity extends AbstractService {
    * Access policy options
    */
   private enum AccessPolicy {
-    anon, sasl, digest
+    anon, sasl, digest, simple
   }
 
   /**
@@ -214,6 +214,9 @@ public class RegistrySecurity extends AbstractService {
     case REGISTRY_CLIENT_AUTH_ANONYMOUS:
       access = AccessPolicy.anon;
       break;
+    case REGISTRY_CLIENT_AUTH_SIMPLE:
+      access = AccessPolicy.simple;
+      break;
     default:
       throw new ServiceStateException(E_UNKNOWN_AUTHENTICATION_MECHANISM
                                       + "\"" + auth + "\"");
@@ -302,6 +305,7 @@ public class RegistrySecurity extends AbstractService {
           break;
 
         case anon:
+        case simple:
           // nothing is needed; account is read only.
           if (LOG.isDebugEnabled()) {
             LOG.debug("Auth is anonymous");
@@ -758,6 +762,9 @@ public class RegistrySecurity extends AbstractService {
           LOG.info(
               "Enabling ZK sasl client: jaasClientEntry = " + jaasClientEntry
                   + ", principal = " + principal + ", keytab = " + keytab);
+        default:
+          clearZKSaslClientProperties();
+          break;
       }
     }
   }
index 8641842..7bbf4aa 100644 (file)
@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.sun.jersey.spi.container.servlet.ServletContainer;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.curator.framework.AuthInfo;
@@ -1049,11 +1051,23 @@ public class ResourceManager extends CompositeService implements Recoverable {
     RMWebAppUtil.setupSecurityAndFilters(conf,
         getClientRMService().rmDTSecretManager);
 
+    Map<String, String> params = new HashMap<String, String>();
+    if (getConfig().getBoolean(YarnConfiguration.YARN_API_SERVICES_ENABLE,
+        false)) {
+      String apiPackages = "org.apache.hadoop.yarn.service.webapp;" +
+          "org.apache.hadoop.yarn.webapp";
+      params.put("com.sun.jersey.config.property.resourceConfigClass",
+          "com.sun.jersey.api.core.PackagesResourceConfig");
+      params.put("com.sun.jersey.config.property.packages", apiPackages);
+    }
+
     Builder<ApplicationMasterService> builder = 
         WebApps
             .$for("cluster", ApplicationMasterService.class, masterService,
                 "ws")
             .with(conf)
+            .withServlet("API-Service", "/app/*",
+                ServletContainer.class, params)
             .withHttpSpnegoPrincipalKey(
                 YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY)
             .withHttpSpnegoKeytabKey(
@@ -1109,15 +1123,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
       }
     }
 
-    if (getConfig().getBoolean(YarnConfiguration.YARN_API_SERVICES_ENABLE,
-        false)) {
-      serviceConfig = new HashMap<String, String>();
-      String apiPackages = "org.apache.hadoop.yarn.service.webapp;" +
-          "org.apache.hadoop.yarn.webapp";
-      serviceConfig.put("PackageName", apiPackages);
-      serviceConfig.put("PathSpec", "/app/*");
-    }
-    webApp = builder.start(new RMWebApp(this), uiWebAppContext, serviceConfig);
+    webApp = builder.start(new RMWebApp(this), uiWebAppContext);
   }
 
   private String getWebAppsPath(String appName) {