added spark-shell provider
authorYaniv Rodenski <roadan@gmail.com>
Sun, 5 Aug 2018 10:57:34 +0000 (20:57 +1000)
committerYaniv Rodenski <roadan@gmail.com>
Sun, 5 Aug 2018 10:57:34 +0000 (20:57 +1000)
executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkShellScalaRunnerProvider.scala [new file with mode: 0644]
leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala
leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java

index 90c2001..fff2a81 100755 (executable)
@@ -94,6 +94,7 @@ class MesosActionsExecutor extends Executor with Logging {
       .setTaskId(taskInfo.getTaskId)
       .setState(TaskState.TASK_STARTING).build()
     driver.sendStatusUpdate(status)
+
     val task = Future {
 
       val taskData = mapper.readValue(new ByteArrayInputStream(taskInfo.getData.toByteArray), classOf[TaskData])
index ac442d5..e0a6f8d 100644 (file)
 package org.apache.amaterasu.frameworks.spark.dispatcher
 
 import java.io.File
+import java.util
 
 import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers.{PySparkRunnerProvider, SparkScalaRunnerProvider}
+import org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers._
 import org.apache.amaterasu.leader.common.utilities.{DataLoader, MemoryFormatParser}
 import org.apache.amaterasu.sdk.frameworks.configuration.DriverConfiguration
 import org.apache.amaterasu.sdk.frameworks.{FrameworkSetupProvider, RunnerSetupProvider}
 
 import scala.collection.mutable
+import collection.JavaConversions._
 
 class SparkSetupProvider extends FrameworkSetupProvider {
 
@@ -50,19 +52,24 @@ class SparkSetupProvider extends FrameworkSetupProvider {
     this.conf = conf
 
     runnerProviders += ("scala" -> SparkScalaRunnerProvider(conf))
+    runnerProviders += ("scala-shell" -> SparkShellScalaRunnerProvider(conf))
     runnerProviders += ("pyspark" -> PySparkRunnerProvider(conf))
 
   }
 
   override def getGroupIdentifier: String = "spark"
 
-  override def getGroupResources: Array[File] = {
-
-    conf.mode match {
+  override def getGroupResources: Array[File] = conf.mode match {
       case "mesos" => Array[File](new File(s"spark-${conf.Webserver.sparkVersion}.tgz"), new File(s"spark-runner-${conf.version}-all.jar"), new File(s"spark-runtime-${conf.version}.jar"))
       case "yarn" => new File(conf.spark.home).listFiles
       case _ => Array[File]()
     }
+
+
+  override def getEnvironmentVariables: util.Map[String, String] = conf.mode match {
+    case "mesos" => Map[String, String](s"SPARK_HOME" -> s"spark-${conf.Webserver.sparkVersion}")
+    case "yarn" => Map[String, String]("SPARK_HOME" -> "spark")we e
+    case _ => Map[String, String]()
   }
 
   override def getDriverConfiguration: DriverConfiguration = {
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkShellScalaRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkShellScalaRunnerProvider.scala
new file mode 100644 (file)
index 0000000..dd9428b
--- /dev/null
@@ -0,0 +1,26 @@
+package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
+
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
+
+class SparkShellScalaRunnerProvider extends RunnerSetupProvider {
+
+  private var conf: ClusterConfig = _
+
+  override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = {
+
+    println(s"===> $$SPARK_HOME/bin/spark-shell ${actionData.src} --jars spark-runtime-${conf.version}.jar")
+    s"$$SPARK_HOME/bin/spark-shell ${actionData.src} --jars spark-runtime-${conf.version}.jar"
+  }
+
+  override def getRunnerResources = ???
+}
+
+object SparkShellScalaRunnerProvider {
+  def apply(conf: ClusterConfig): SparkShellScalaRunnerProvider = {
+    val result = new SparkShellScalaRunnerProvider
+    result.conf = conf
+    result
+  }
+}
\ No newline at end of file
index aba6210..c271f50 100755 (executable)
@@ -83,13 +83,11 @@ object JobParser {
     * @param previous the previous action, this is used in order to add the current action
     *                 to the nextActionIds
     */
-  def parseActions(
-    actions: Seq[JsonNode],
-    manager: JobManager,
-    actionsQueue: BlockingQueue[ActionData],
-    attempts: Int,
-    previous: Action
-  ): Unit = {
+  def parseActions(actions: Seq[JsonNode],
+                   manager: JobManager,
+                   actionsQueue: BlockingQueue[ActionData],
+                   attempts: Int,
+                   previous: Action): Unit = {
 
     if (actions.isEmpty)
       return
@@ -106,8 +104,8 @@ object JobParser {
 
     //updating the list of frameworks setup
     manager.frameworks.getOrElseUpdate(action.data.groupId,
-                                       new mutable.HashSet[String]())
-                                       .add(action.data.typeId)
+      new mutable.HashSet[String]())
+      .add(action.data.typeId)
 
 
     if (manager.head == null)
@@ -143,34 +141,28 @@ object JobParser {
 
   }
 
-  def parseSequentialAction(
-    action: JsonNode,
-    jobId: String,
-    actionsQueue: BlockingQueue[ActionData],
-    client: CuratorFramework,
-    attempts: Int
-  ): SequentialAction = {
+  def parseSequentialAction(action: JsonNode,
+                            jobId: String,
+                            actionsQueue: BlockingQueue[ActionData],
+                            client: CuratorFramework,
+                            attempts: Int): SequentialAction = {
 
-    SequentialAction(
-      action.path("name").asText,
+    SequentialAction(action.path("name").asText,
       action.path("file").asText,
       action.path("runner").path("group").asText,
       action.path("runner").path("type").asText,
-      action.path("exports").fields().asScala.toSeq.map(e=> (e.getKey, e.getValue.asText())).toMap,
+      action.path("exports").fields().asScala.toSeq.map(e => (e.getKey, e.getValue.asText())).toMap,
       jobId,
       actionsQueue,
       client,
-      attempts
-    )
+      attempts)
   }
 
-  def parseErrorAction(
-    action: JsonNode,
-    jobId: String,
-    parent: String,
-    actionsQueue: BlockingQueue[ActionData],
-    client: CuratorFramework
-  ): SequentialAction = {
+  def parseErrorAction(action: JsonNode,
+                       jobId: String,
+                       parent: String,
+                       actionsQueue: BlockingQueue[ActionData],
+                       client: CuratorFramework): SequentialAction = {
 
     ErrorAction(
       action.path("name").asText,
index a6c8306..710e6bb 100755 (executable)
@@ -37,6 +37,7 @@ import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
 import org.apache.curator.retry.ExponentialBackoffRetry
 import org.apache.log4j.LogManager
 import org.apache.mesos.Protos.CommandInfo.URI
+import org.apache.mesos.Protos.Environment.Variable
 import org.apache.mesos.Protos._
 import org.apache.mesos.protobuf.ByteString
 import org.apache.mesos.{Protos, SchedulerDriver}
@@ -207,12 +208,18 @@ class JobScheduler extends AmaterasuScheduler {
                     .setExecutable(false)
                     .setExtract(false)
                     .build())
+
+                // setting the processes environment variables
+                val envVarsList = frameworkProvider.getEnvironmentVariables.asScala.toList.map(x=> Variable.newBuilder().setName(x._1).setValue(x._2).build()).asJava
+                command.setEnvironment(Environment.newBuilder().addAllVariables(envVarsList))
+
                 executor = ExecutorInfo
                   .newBuilder
                   .setData(ByteString.copyFrom(execData))
                   .setName(taskId.getValue)
                   .setExecutorId(ExecutorID.newBuilder().setValue(executorId))
                   .setCommand(command)
+
                   .build()
 
                 slavesExecutors.put(offer.getSlaveId.getValue, executor)
index 07a28b1..99393f6 100644 (file)
@@ -20,6 +20,7 @@ import org.apache.amaterasu.common.configuration.ClusterConfig;
 import org.apache.amaterasu.sdk.frameworks.configuration.DriverConfiguration;
 
 import java.io.File;
+import java.util.Map;
 
 public interface FrameworkSetupProvider {
 
@@ -33,4 +34,6 @@ public interface FrameworkSetupProvider {
 
     RunnerSetupProvider getRunnerProvider(String runnerId);
 
+    Map<String, String> getEnvironmentVariables();
+
 }
\ No newline at end of file