spark runners providers added
authorYaniv Rodenski <roadan@gmail.com>
Thu, 26 Jul 2018 07:33:46 +0000 (17:33 +1000)
committerYaniv Rodenski <roadan@gmail.com>
Thu, 26 Jul 2018 07:33:46 +0000 (17:33 +1000)
frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala [new file with mode: 0644]
frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala [new file with mode: 0644]
leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java
sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.java [new file with mode: 0644]

index 5e26e45..d3be430 100644 (file)
@@ -19,50 +19,51 @@ package org.apache.amaterasu.frameworks.spark.dispatcher
 import java.io.File
 
 import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers.{PySparkRunnerProvider, SparkScalaRunnerProvider}
 import org.apache.amaterasu.leader.common.utilities.{DataLoader, MemoryFormatParser}
-import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider
 import org.apache.amaterasu.sdk.frameworks.configuration.DriverConfiguration
-
-import scala.collection.mutable
+import org.apache.amaterasu.sdk.frameworks.{FrameworkSetupProvider, RunnerSetupProvider}
 
 import scala.collection.mutable
 
 class SparkSetupProvider extends FrameworkSetupProvider {
 
-
   private var env: String = _
   private var conf: ClusterConfig = _
-  private val runnersResources = mutable.Map[String, Array[File]]()
-  //private var execData: ExecData = _
   private lazy val sparkExecConfigurations: mutable.Map[String, Any] = loadSparkConfig
 
+  private val runnerProviders: mutable.Map[String, RunnerSetupProvider] = mutable.Map[String, RunnerSetupProvider]()
+
   private def loadSparkConfig: mutable.Map[String, Any] = {
+
     val execData = DataLoader.getExecutorData(env, conf)
-    val sparkExecConfigurationsurations = execData.configurations.get("spark")
-    if (sparkExecConfigurationsurations.isEmpty) {
-      throw new Exception(s"Spark configuration files could not be loaded for the environment ${env}")
+    val sparkExecConfiguration = execData.configurations.get("spark")
+    if (sparkExecConfiguration.isEmpty) {
+      throw new Exception(s"Spark configuration files could not be loaded for the environment $env")
     }
-    collection.mutable.Map(sparkExecConfigurationsurations.get.toSeq: _*)
+    collection.mutable.Map(sparkExecConfiguration.get.toSeq: _*)
+
   }
 
   override def init(env: String, conf: ClusterConfig): Unit = {
     this.env = env
     this.conf = conf
 
-    runnersResources += "scala" -> Array.empty[File]
-    runnersResources += "sql" -> Array.empty[File]
-    //TODO: Nadav needs to setup conda here
-    runnersResources += "python" -> Array.empty[File]
+    runnerProviders += ("scala" -> SparkScalaRunnerProvider(conf))
+    runnerProviders += ("pyspark" -> PySparkRunnerProvider(conf))
+
   }
 
   override def getGroupIdentifier: String = "spark"
 
   override def getGroupResources: Array[File] = {
-    new File(conf.spark.home).listFiles
-  }
 
-  override def getRunnerResources(runnerId: String): Array[File] = {
-    runnersResources(runnerId)
+    println(s"===> mode ${conf.mode}")
+    conf.mode match {
+      case "mesos" => Array[File](new File("spark-2.2.1-bin-hadoop2.7.tgz"), new File(s"spark-runner-${conf.version}-all.jar"), new File(s"spark-runtime-${conf.version}.jar"))
+      case "yarn" => new File(conf.spark.home).listFiles
+      case _ => Array[File]()
+    }
   }
 
   override def getDriverConfiguration: DriverConfiguration = {
@@ -99,4 +100,8 @@ class SparkSetupProvider extends FrameworkSetupProvider {
 
     new DriverConfiguration(mem, cpu)
   }
+
+  override def getRunnerProvider(runnerId: String): RunnerSetupProvider = {
+    runnerProviders(runnerId)
+  }
 }
\ No newline at end of file
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
new file mode 100644 (file)
index 0000000..06362ee
--- /dev/null
@@ -0,0 +1,42 @@
+package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
+
+import java.net.URLEncoder
+
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.leader.common.utilities.DataLoader
+import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
+import org.apache.hadoop.yarn.api.ApplicationConstants
+
+class PySparkRunnerProvider extends RunnerSetupProvider {
+
+  private var conf: ClusterConfig = _
+
+  override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String): String = conf.mode match {
+    case "mesos" => s"""env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${conf.Webserver.Port}/dist/spark-${conf.Webserver.sparkVersion}.tgz java -cp executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.Webserver.sparkVersion}/jars/* -Dscala.usejavacp=true -Djava.library.path=/usr/lib org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId ${conf.master} ${actionData.name}""".stripMargin
+    case "yarn" => "/bin/bash ./miniconda.sh -b -p $PWD/miniconda && " +
+      s"/bin/bash spark/bin/load-spark-env.sh && " +
+      s"java -cp spark/jars/*:executor.jar:spark-runner.jar:spark-runtime.jar:spark/conf/:${conf.YARN.hadoopHomeDir}/conf/ " +
+      "-Xmx2G " +
+      "-Dscala.usejavacp=true " +
+      "-Dhdp.version=2.6.1.0-129 " +
+      "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " +
+      s"'$jobId' '${conf.master}' '${actionData.name}' '${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}' '${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}' '$executorId'" +
+      s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " +
+      s"2> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr "
+    case _ => ""
+  }
+
+  override def getRunnerResources: Array[String] = {
+    Array[String]("miniconda.sh", "spark_intp.py", "runtime.py", "codegen.py")
+  }
+
+}
+
+object PySparkRunnerProvider {
+  def apply(conf: ClusterConfig): PySparkRunnerProvider = {
+    val result = new PySparkRunnerProvider
+    result.conf = conf
+    result
+  }
+}
\ No newline at end of file
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
new file mode 100644 (file)
index 0000000..bd7dee1
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
+
+import java.net.URLEncoder
+
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.leader.common.utilities.DataLoader
+import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
+import org.apache.hadoop.yarn.api.ApplicationConstants
+
+class SparkScalaRunnerProvider extends RunnerSetupProvider {
+
+  private var conf: ClusterConfig = _
+
+  override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String): String = conf.mode match {
+    case "mesos" => s"""env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${conf.Webserver.Port}/dist/spark-${conf.Webserver.sparkVersion}.tgz java -cp executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.Webserver.sparkVersion}/jars/* -Dscala.usejavacp=true -Djava.library.path=/usr/lib org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId ${conf.master} ${actionData.name}""".stripMargin
+    case "yarn" => s"/bin/bash spark/bin/load-spark-env.sh && " +
+      s"java -cp spark/jars/*:executor.jar:spark-runner.jar:spark-runtime.jar:spark/conf/:${conf.YARN.hadoopHomeDir}/conf/ " +
+      "-Xmx2G " +
+      "-Dscala.usejavacp=true " +
+      "-Dhdp.version=2.6.1.0-129 " +
+      "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " +
+      s"'$jobId' '${conf.master}' '${actionData.name}' '${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}' '${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}' '$executorId'" +
+      s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " +
+      s"2> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr "
+    case _ => ""
+  }
+
+  override def getRunnerResources: Array[String] = {
+    Array[String]()
+  }
+
+}
+
+object SparkScalaRunnerProvider {
+  def apply(conf: ClusterConfig): SparkScalaRunnerProvider = {
+    val result = new SparkScalaRunnerProvider
+    result.conf = conf
+    result
+  }
+}
\ No newline at end of file
index 1bb82ff..407e679 100755 (executable)
@@ -154,9 +154,12 @@ class JobScheduler extends AmaterasuScheduler {
             val slaveActions = executionMap(offer.getSlaveId.toString)
             slaveActions.put(taskId.getValue, ActionStatus.started)
 
+            val frameworkFactory = FrameworkProvidersFactory.apply(env, config)
+            val frameworkProvider = frameworkFactory.providers(actionData.groupId)
+            val runnerProvider = frameworkProvider.getRunnerProvider(actionData.typeId)
+
             // searching for an executor that already exist on the slave, if non exist
             // we create a new one
-            //TODO: move to .getOrElseUpdate when migrting to scala 2.11
             var executor: ExecutorInfo = null
             val slaveId = offer.getSlaveId.getValue
             slavesExecutors.synchronized {
@@ -166,52 +169,35 @@ class JobScheduler extends AmaterasuScheduler {
               }
               else {
                 val execData = DataLoader.getExecutorDataBytes(env, config)
-
+                val executorId = taskId.getValue + "-" + UUID.randomUUID()
+                //creating the command
                 val command = CommandInfo
                   .newBuilder
-                  .setValue(
-                    s"""$awsEnv env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/dist/spark-${config.Webserver.sparkVersion}.tgz java -cp executor-${config.version}-all.jar:spark-runner-${config.version}-all.jar:spark-runtime-${config.version}.jar:spark-${config.Webserver.sparkVersion}/jars/* -Dscala.usejavacp=true -Djava.library.path=/usr/lib org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor ${jobManager.jobId} ${config.master} ${actionData.name}""".stripMargin
-                  )
-                  //                  HttpServer.getFilesInDirectory(sys.env("AMA_NODE"), config.Webserver.Port).foreach(f=>
-                  //                  )
-                  .addUris(URI.newBuilder
-                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/executor-${config.version}-all.jar")
-                  .setExecutable(false)
-                  .setExtract(false)
-                  .build())
-                  .addUris(URI.newBuilder
-                    .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/spark-runner-${config.version}-all.jar")
-                    .setExecutable(false)
-                    .setExtract(false)
-                    .build())
+                  .setValue(runnerProvider.getCommand(jobManager.jobId, actionData, env, executorId))
                   .addUris(URI.newBuilder
-                    .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/spark-runtime-${config.version}.jar")
+                    .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/executor-${config.version}-all.jar")
                     .setExecutable(false)
                     .setExtract(false)
                     .build())
-                  .addUris(URI.newBuilder()
-                    .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/spark-2.2.1-bin-hadoop2.7.tgz")
-                    .setExecutable(false)
-                    .setExtract(true)
-                    .build())
+
+                // Getting framework resources
+                frameworkProvider.getGroupResources.foreach(f => command.addUris(URI.newBuilder
+                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${f.getName}")
+                  .setExecutable(false)
+                  .setExtract(true)
+                  .build()))
+
+                // Getting running resources
+                runnerProvider.getRunnerResources.foreach(r => command.addUris(URI.newBuilder
+                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/$r")
+                  .setExecutable(false)
+                  .setExtract(false)
+                  .build()))
+
+                command
                   .addUris(URI.newBuilder()
                     .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/miniconda.sh")
-                    .setExecutable(false)
-                    .setExtract(false)
-                    .build())
-                  .addUris(URI.newBuilder()
-                    .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/spark_intp.py")
-                    .setExecutable(false)
-                    .setExtract(false)
-                    .build())
-                  .addUris(URI.newBuilder()
-                    .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/runtime.py")
-                    .setExecutable(false)
-                    .setExtract(false)
-                    .build())
-                  .addUris(URI.newBuilder()
-                    .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/codegen.py")
-                    .setExecutable(false)
+                    .setExecutable(true)
                     .setExtract(false)
                     .build())
                   .addUris(URI.newBuilder()
@@ -223,7 +209,7 @@ class JobScheduler extends AmaterasuScheduler {
                   .newBuilder
                   .setData(ByteString.copyFrom(execData))
                   .setName(taskId.getValue)
-                  .setExecutorId(ExecutorID.newBuilder().setValue(taskId.getValue + "-" + UUID.randomUUID()))
+                  .setExecutorId(ExecutorID.newBuilder().setValue(executorId))
                   .setCommand(command)
                   .build()
 
@@ -231,8 +217,6 @@ class JobScheduler extends AmaterasuScheduler {
               }
             }
 
-            val frameworkFactory = FrameworkProvidersFactory.apply(env, config)
-            val frameworkProvider = frameworkFactory.providers(actionData.groupId)
             val driverConfiguration = frameworkProvider.getDriverConfiguration
 
             val actionTask = TaskInfo
index 1f1aa25..5c717d3 100644 (file)
@@ -17,7 +17,7 @@
 package org.apache.amaterasu.leader.yarn
 
 import java.io.{File, FileInputStream, InputStream}
-import java.net.{InetAddress, ServerSocket, URLEncoder}
+import java.net.{InetAddress, ServerSocket}
 import java.nio.ByteBuffer
 import java.util
 import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
@@ -28,7 +28,6 @@ import org.apache.activemq.broker.BrokerService
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.leader.common.utilities.DataLoader
 import org.apache.amaterasu.leader.execution.frameworks.FrameworkProvidersFactory
 import org.apache.amaterasu.leader.execution.{JobLoader, JobManager}
 import org.apache.amaterasu.leader.utilities.{ActiveReportListener, Args}
@@ -38,7 +37,6 @@ import org.apache.curator.retry.ExponentialBackoffRetry
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.DataOutputBuffer
 import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api.ApplicationConstants
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl
@@ -244,22 +242,11 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
       val actionData = actionsBuffer.poll()
       val containerTask = Future[ActionData] {
 
-        val taskData = DataLoader.getTaskDataString(actionData, env)
-        val execData = DataLoader.getExecutorDataString(env, config)
-
+        val frameworkFactory = FrameworkProvidersFactory(env, config)
+        val framework = frameworkFactory.getFramework(actionData.groupId)
+        val runnerProvider = framework.getRunnerProvider(actionData.typeId)
         val ctx = Records.newRecord(classOf[ContainerLaunchContext])
-        val commands: List[String] = List(
-          "/bin/bash ./miniconda.sh -b -p $PWD/miniconda && ",
-          s"/bin/bash spark/bin/load-spark-env.sh && ",
-          s"java -cp spark/jars/*:executor.jar:spark-runner.jar:spark-runtime.jar:spark/conf/:${config.YARN.hadoopHomeDir}/conf/ " +
-            "-Xmx2G " +
-            "-Dscala.usejavacp=true " +
-            "-Dhdp.version=2.6.1.0-129 " +
-            "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " +
-            s"'${jobManager.jobId}' '${config.master}' '${actionData.name}' '${URLEncoder.encode(taskData, "UTF-8")}' '${URLEncoder.encode(execData, "UTF-8")}' '${actionData.id}-${container.getId.getContainerId}' '$address' " +
-            s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " +
-            s"2> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr "
-        )
+        val commands: List[String] = List(runnerProvider.getCommand(jobManager.jobId, actionData, env, s"${actionData.id}-${container.getId.getContainerId}"))
 
         log.info("Running container id {}.", container.getId.getContainerId)
         log.info("Running container id {} with command '{}'", container.getId.getContainerId, commands.last)
@@ -292,8 +279,6 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
           "spark-version-info.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/spark-version-info.properties"))),
           "spark_intp.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/spark_intp.py"))))
 
-        val frameworkFactory = FrameworkProvidersFactory(env, config)
-        val framework = frameworkFactory.getFramework(actionData.groupId)
 
         //adding the framework and executor resources
         setupResources(yarnJarPath, framework.getGroupIdentifier, resources, framework.getGroupIdentifier)
index ef53fa9..07a28b1 100644 (file)
@@ -29,8 +29,8 @@ public interface FrameworkSetupProvider {
 
     File[] getGroupResources();
 
-    File[] getRunnerResources(String runnerId);
-
     DriverConfiguration getDriverConfiguration();
 
+    RunnerSetupProvider getRunnerProvider(String runnerId);
+
 }
\ No newline at end of file
diff --git a/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.java b/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.java
new file mode 100644 (file)
index 0000000..875c3d2
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.sdk.frameworks;
+
+import org.apache.amaterasu.common.dataobjects.ActionData;
+
+public interface RunnerSetupProvider {
+
+    String getCommand(String jobId, ActionData actionData, String env, String executorId);
+
+    String[] getRunnerResources();
+
+}
\ No newline at end of file