added getActionResources and getActionDependencies
authorYaniv Rodenski <roadan@gmail.com>
Sun, 21 Oct 2018 00:54:14 +0000 (11:54 +1100)
committerYaniv Rodenski <roadan@gmail.com>
Sun, 21 Oct 2018 00:54:14 +0000 (11:54 +1100)
1  2 
frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkShellScalaRunnerProvider.scala
leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java
sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.java

@@@ -31,10 -31,10 +31,14 @@@ class PySparkRunnerProvider extends Run
      case _ => ""
    }
  
--  override def getRunnerResources: Array[String] = {
++  override def getRunnerResources: Array[String] =
      Array[String]("miniconda.sh", "spark_intp.py", "runtime.py", "codegen.py")
--  }
  
++  def getActionResources: Array[String] =
++    Array[String]()
++
++  override def getActionDependencies(jobId: String, actionData: ActionData): Array[String] =
++    Array[String]()
  }
  
  object PySparkRunnerProvider {
@@@ -32,9 -32,9 +32,9 @@@ class SparkScalaRunnerProvider extends 
    override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = conf.mode match {
      case "mesos" =>
        s"env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${conf.Webserver.Port}/dist/spark-${conf.Webserver.sparkVersion}.tgz " +
--      s"java -cp executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.Webserver.sparkVersion}/jars/* " +
--      s"-Dscala.usejavacp=true -Djava.library.path=$libPath " +
--      s"org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId ${conf.master} ${actionData.name}".stripMargin
++        s"java -cp executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.Webserver.sparkVersion}/jars/* " +
++        s"-Dscala.usejavacp=true -Djava.library.path=$libPath " +
++        s"org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId ${conf.master} ${actionData.name}".stripMargin
      case "yarn" => s"/bin/bash spark/bin/load-spark-env.sh && " +
        s"java -cp spark/jars/*:executor.jar:spark-runner.jar:spark-runtime.jar:spark/conf/:${conf.YARN.hadoopHomeDir}/conf/ " +
        "-Xmx2G " +
      case _ => ""
    }
  
--  override def getRunnerResources: Array[String] = {
++  override def getRunnerResources: Array[String] =
      Array[String]()
--  }
  
++
++  def getActionResources: Array[String] =
++    Array[String]()
++
++  override def getActionDependencies(jobId: String, actionData: ActionData): Array[String] =
++    Array[String]()
  }
  
  object SparkScalaRunnerProvider {
index 709ce0f,0000000..00091e7
mode 100644,000000..100644
--- /dev/null
@@@ -1,26 -1,0 +1,30 @@@
-   override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = {
-     println(s"===> $$SPARK_HOME/bin/spark-shell ${actionData.src} --jars spark-runtime-${conf.version}.jar")
 +package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
 +
 +import org.apache.amaterasu.common.configuration.ClusterConfig
 +import org.apache.amaterasu.common.dataobjects.ActionData
 +import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
 +
 +class SparkShellScalaRunnerProvider extends RunnerSetupProvider {
 +
 +  private var conf: ClusterConfig = _
 +
-   }
++  override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String =
 +    s"$$SPARK_HOME/bin/spark-shell ${actionData.src} --jars spark-runtime-${conf.version}.jar"
-   override def getRunnerResources: Array[String] = Array[String]()
 +
++  override def getRunnerResources: Array[String] =
++    Array[String]()
++
++  def getActionResources: Array[String] =
++    Array[String]()
++
++  override def getActionDependencies(jobId: String, actionData: ActionData): Array[String] =
++  Array[String](s"$jobId/${actionData.name}/${actionData.src}")
 +}
 +
 +object SparkShellScalaRunnerProvider {
 +  def apply(conf: ClusterConfig): SparkShellScalaRunnerProvider = {
 +    val result = new SparkShellScalaRunnerProvider
 +    result.conf = conf
 +    result
 +  }
 +}
   */
  package org.apache.amaterasu.leader.mesos.schedulers
  
+ import java.io.{File, PrintWriter, StringWriter}
++import java.nio.file.Files.copy
++import java.nio.file.Paths.get
++import java.nio.file.StandardCopyOption.REPLACE_EXISTING
  import java.util
  import java.util.concurrent.locks.ReentrantLock
  import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
@@@ -164,67 -184,83 +188,98 @@@ class JobScheduler extends AmaterasuSch
              var executor: ExecutorInfo = null
              val slaveId = offer.getSlaveId.getValue
              slavesExecutors.synchronized {
--              if (slavesExecutors.contains(slaveId) &&
--                offer.getExecutorIdsList.contains(slavesExecutors(slaveId).getExecutorId)) {
--                executor = slavesExecutors(slaveId)
--              }
--              else {
--                val execData = DataLoader.getExecutorDataBytes(env, config)
--                val executorId = taskId.getValue + "-" + UUID.randomUUID()
--                //creating the command
--
--                println(s"===> ${runnerProvider.getCommand(jobManager.jobId, actionData, env, executorId, "")}")
--                val command = CommandInfo
--                  .newBuilder
--                  .setValue(runnerProvider.getCommand(jobManager.jobId, actionData, env, executorId, ""))
--                  .addUris(URI.newBuilder
--                    .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/executor-${config.version}-all.jar")
--                    .setExecutable(false)
--                    .setExtract(false)
--                    .build())
--
-                 // Getting framework resources
-                 frameworkProvider.getGroupResources.foreach(f => command.addUris(URI.newBuilder
-                   .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${f.getName}")
 -                // Getting env.yaml
 -                command.addUris(URI.newBuilder
 -                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/env.yaml")
++              //              if (slavesExecutors.contains(slaveId) &&
++              //                offer.getExecutorIdsList.contains(slavesExecutors(slaveId).getExecutorId)) {
++              //                executor = slavesExecutors(slaveId)
++              //              }
++              //              else {
++              val execData = DataLoader.getExecutorDataBytes(env, config)
++              val executorId = taskId.getValue + "-" + UUID.randomUUID()
++              //creating the command
++
++              // TODO: move this into the runner provider somehow
++              copy(get(s"repo/src/${actionData.src}"), get(s"dist/${jobManager.jobId}/${actionData.name}/${actionData.src}"), REPLACE_EXISTING)
++
++              println(s"===> ${runnerProvider.getCommand(jobManager.jobId, actionData, env, executorId, "")}")
++              val command = CommandInfo
++                .newBuilder
++                .setValue(runnerProvider.getCommand(jobManager.jobId, actionData, env, executorId, ""))
++                .addUris(URI.newBuilder
++                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/executor-${config.version}-all.jar")
                    .setExecutable(false)
--                  .setExtract(true)
-                   .build()))
-                 // Getting running resources
-                 runnerProvider.getRunnerResources.foreach(r => command.addUris(URI.newBuilder
-                   .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/$r")
++                  .setExtract(false)
+                   .build())
 -                // Getting datastores.yaml
 -                command.addUris(URI.newBuilder
 -                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/datastores.yaml")
 -                  .setExecutable(false)
 -                  .setExtract(true)
++              // Getting env.yaml
++              command.addUris(URI.newBuilder
++                .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/env.yaml")
++                .setExecutable(false)
++                .setExtract(true)
++                .build())
++
++              // Getting datastores.yaml
++              command.addUris(URI.newBuilder
++                .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/datastores.yaml")
++                .setExecutable(false)
++                .setExtract(true)
++                .build())
++
++              // Getting runtime.yaml
++              command.addUris(URI.newBuilder
++                .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/runtime.yaml")
++                .setExecutable(false)
++                .setExtract(true)
++                .build())
++
++              // Getting framework resources
++              frameworkProvider.getGroupResources.foreach(f => command.addUris(URI.newBuilder
++                .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${f.getName}")
++                .setExecutable(false)
++                .setExtract(true)
++                .build()))
++
++              // Getting runner resources
++              runnerProvider.getRunnerResources.foreach(r => command.addUris(URI.newBuilder
++                .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/$r")
++                .setExecutable(false)
++                .setExtract(false)
++                .build()))
++
++              // Getting action specific resources
++              runnerProvider.getActionResources(jobManager.jobId, actionData).foreach(r => command.addUris(URI.newBuilder
++                .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/$r")
++                .setExecutable(false)
++                .setExtract(false)
++                .build()))
++
++              command
++                .addUris(URI.newBuilder()
++                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/miniconda.sh") //TODO: Nadav needs to clean this on the executor side
++                  .setExecutable(true)
++                  .setExtract(false)
+                   .build())
 -
 -                // Getting runtime.yaml
 -                command.addUris(URI.newBuilder
 -                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/runtime.yaml")
++                .addUris(URI.newBuilder()
++                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/amaterasu.properties")
                    .setExecutable(false)
 -                  .setExtract(true)
 +                  .setExtract(false)
-                   .build()))
-                 command
-                   .addUris(URI.newBuilder()
-                     .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/miniconda.sh")
-                     .setExecutable(true)
-                     .setExtract(false)
-                     .build())
-                   .addUris(URI.newBuilder()
-                     .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/amaterasu.properties")
-                     .setExecutable(false)
-                     .setExtract(false)
-                     .build())
-                 // setting the processes environment variables
-                 val envVarsList = frameworkProvider.getEnvironmentVariables.asScala.toList.map(x=> Variable.newBuilder().setName(x._1).setValue(x._2).build()).asJava
-                 command.setEnvironment(Environment.newBuilder().addAllVariables(envVarsList))
-                 executor = ExecutorInfo
-                   .newBuilder
-                   .setData(ByteString.copyFrom(execData))
-                   .setName(taskId.getValue)
-                   .setExecutorId(ExecutorID.newBuilder().setValue(executorId))
-                   .setCommand(command)
-                   .build()
-                 slavesExecutors.put(offer.getSlaveId.getValue, executor)
-               }
+                   .build())
 -                // Getting framework resources
 -                frameworkProvider.getGroupResources.foreach(f => command.addUris(URI.newBuilder
 -                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${f.getName}")
 -                  .setExecutable(false)
 -                  .setExtract(true)
 -                  .build()))
++              // setting the processes environment variables
++              val envVarsList = frameworkProvider.getEnvironmentVariables.asScala.toList.map(x => Variable.newBuilder().setName(x._1).setValue(x._2).build()).asJava
++              command.setEnvironment(Environment.newBuilder().addAllVariables(envVarsList))
 -                // Getting running resources
 -                runnerProvider.getRunnerResources.foreach(r => command.addUris(URI.newBuilder
 -                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/$r")
 -                  .setExecutable(false)
 -                  .setExtract(false)
 -                  .build()))
 -
 -                command
 -                  .addUris(URI.newBuilder()
 -                    .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/miniconda.sh") //TODO: Nadav needs to clean this on the executor side
 -                    .setExecutable(true)
 -                    .setExtract(false)
 -                    .build())
 -                  .addUris(URI.newBuilder()
 -                    .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/amaterasu.properties")
 -                    .setExecutable(false)
 -                    .setExtract(false)
 -                    .build())
 -
 -                executor = ExecutorInfo
 -                  .newBuilder
 -                  .setData(ByteString.copyFrom(execData))
 -                  .setName(taskId.getValue)
 -                  .setExecutorId(ExecutorID.newBuilder().setValue(executorId))
 -                  .setCommand(command)
 -                  .build()
 -
 -                slavesExecutors.put(offer.getSlaveId.getValue, executor)
 -              }
++              executor = ExecutorInfo
++                .newBuilder
++                .setData(ByteString.copyFrom(execData))
++                .setName(taskId.getValue)
++                .setExecutorId(ExecutorID.newBuilder().setValue(executorId))
++                .setCommand(command)
++
++                .build()
++
++              slavesExecutors.put(offer.getSlaveId.getValue, executor)
              }
++            //}
  
              val driverConfiguration = frameworkProvider.getDriverConfiguration
  
@@@ -34,6 -33,6 +34,8 @@@ public interface FrameworkSetupProvide
  
      RunnerSetupProvider getRunnerProvider(String runnerId);
  
 +    Map<String, String> getEnvironmentVariables();
 +
+     String[] getConfigurationItems();
  }
@@@ -24,4 -24,4 +24,8 @@@ public interface RunnerSetupProvider 
  
      String[] getRunnerResources();
  
++    String[] getActionResources();
++
++    String[] getActionDependencies(String jobId, ActionData actionData);
++
  }