AMATERASU-51: Add support for configuring the path for action level environment path
authorEran Bartenstein <ebarten@gmail.com>
Thu, 8 Nov 2018 07:58:29 +0000 (18:58 +1100)
committerEran Bartenstein <ebarten@gmail.com>
Thu, 8 Nov 2018 07:58:29 +0000 (18:58 +1100)
Pull latest from apache master and merge latest changes

1  2 
executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala
leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala

@@@ -183,83 -188,98 +187,98 @@@ class JobScheduler extends AmaterasuSch
              var executor: ExecutorInfo = null
              val slaveId = offer.getSlaveId.getValue
              slavesExecutors.synchronized {
-               if (slavesExecutors.contains(slaveId) &&
-                 offer.getExecutorIdsList.contains(slavesExecutors(slaveId).getExecutorId)) {
-                 executor = slavesExecutors(slaveId)
-               }
-               else {
-                 val execData = DataLoader.getExecutorDataBytes(env, config)
-                 val executorId = taskId.getValue + "-" + UUID.randomUUID()
-                 //creating the command
-                 println(s"===> ${runnerProvider.getCommand(jobManager.jobId, actionData, env, executorId, "")}")
-                 val command = CommandInfo
-                   .newBuilder
-                   .setValue(runnerProvider.getCommand(jobManager.jobId, actionData, env, executorId, ""))
-                   .addUris(URI.newBuilder
-                     .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/executor-${config.version}-all.jar")
-                     .setExecutable(false)
-                     .setExtract(false)
-                     .build())
+               //              if (slavesExecutors.contains(slaveId) &&
+               //                offer.getExecutorIdsList.contains(slavesExecutors(slaveId).getExecutorId)) {
+               //                executor = slavesExecutors(slaveId)
+               //              }
+               //              else {
+               val execData = DataLoader.getExecutorDataBytes(env, config)
+               val executorId = taskId.getValue + "-" + UUID.randomUUID()
+               //creating the command
+               // TODO: move this into the runner provider somehow
+               copy(get(s"repo/src/${actionData.src}"), get(s"dist/${jobManager.jobId}/${actionData.name}/${actionData.src}"), REPLACE_EXISTING)
+               println(s"===> ${runnerProvider.getCommand(jobManager.jobId, actionData, env, executorId, "")}")
+               val command = CommandInfo
+                 .newBuilder
+                 .setValue(runnerProvider.getCommand(jobManager.jobId, actionData, env, executorId, ""))
+                 .addUris(URI.newBuilder
+                   .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/executor-${config.version}-all.jar")
+                   .setExecutable(false)
+                   .setExtract(false)
+                   .build())
  
 -              // Getting env.yaml
 -              command.addUris(URI.newBuilder
 -                .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/env.yaml")
 -                .setExecutable(false)
 -                .setExtract(true)
 -                .build())
 +                // Getting env.yaml
 +                command.addUris(URI.newBuilder
 +                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.getName}/env.yaml")
 +                  .setExecutable(false)
 +                  .setExtract(true)
 +                  .build())
  
 -              // Getting datastores.yaml
 -              command.addUris(URI.newBuilder
 -                .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/datastores.yaml")
 -                .setExecutable(false)
 -                .setExtract(true)
 -                .build())
 +                // Getting datastores.yaml
 +                command.addUris(URI.newBuilder
 +                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.getName}/datastores.yaml")
 +                  .setExecutable(false)
 +                  .setExtract(true)
 +                  .build())
  
 -              // Getting runtime.yaml
 -              command.addUris(URI.newBuilder
 -                .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/runtime.yaml")
 -                .setExecutable(false)
 -                .setExtract(true)
 -                .build())
 +                // Getting runtime.yaml
 +                command.addUris(URI.newBuilder
 +                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.getName}/runtime.yaml")
 +                  .setExecutable(false)
 +                  .setExtract(true)
 +                  .build())
  
-                 // Getting framework resources
-                 frameworkProvider.getGroupResources.foreach(f => command.addUris(URI.newBuilder
-                   .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${f.getName}")
-                   .setExecutable(false)
-                   .setExtract(true)
-                   .build()))
-                 // Getting running resources
-                 runnerProvider.getRunnerResources.foreach(r => command.addUris(URI.newBuilder
-                   .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/$r")
+               // Getting framework resources
+               frameworkProvider.getGroupResources.foreach(f => command.addUris(URI.newBuilder
+                 .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${f.getName}")
+                 .setExecutable(false)
+                 .setExtract(true)
+                 .build()))
+               // Getting runner resources
+               runnerProvider.getRunnerResources.foreach(r => command.addUris(URI.newBuilder
+                 .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/$r")
+                 .setExecutable(false)
+                 .setExtract(false)
+                 .build()))
+               // Getting action specific resources
+               runnerProvider.getActionResources(jobManager.jobId, actionData).foreach(r => command.addUris(URI.newBuilder
+                 .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/$r")
+                 .setExecutable(false)
+                 .setExtract(false)
+                 .build()))
+               command
+                 .addUris(URI.newBuilder()
+                   .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/miniconda.sh") //TODO: Nadav needs to clean this on the executor side
+                   .setExecutable(true)
+                   .setExtract(false)
+                   .build())
+                 .addUris(URI.newBuilder()
+                   .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/amaterasu.properties")
                    .setExecutable(false)
                    .setExtract(false)
-                   .build()))
-                 command
-                   .addUris(URI.newBuilder()
-                     .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/miniconda.sh") //TODO: Nadav needs to clean this on the executor side
-                     .setExecutable(true)
-                     .setExtract(false)
-                     .build())
-                   .addUris(URI.newBuilder()
-                     .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/amaterasu.properties")
-                     .setExecutable(false)
-                     .setExtract(false)
-                     .build())
-                 executor = ExecutorInfo
-                   .newBuilder
-                   .setData(ByteString.copyFrom(execData))
-                   .setName(taskId.getValue)
-                   .setExecutorId(ExecutorID.newBuilder().setValue(executorId))
-                   .setCommand(command)
-                   .build()
-                 slavesExecutors.put(offer.getSlaveId.getValue, executor)
-               }
+                   .build())
+               // setting the processes environment variables
+               val envVarsList = frameworkProvider.getEnvironmentVariables.asScala.toList.map(x => Variable.newBuilder().setName(x._1).setValue(x._2).build()).asJava
+               command.setEnvironment(Environment.newBuilder().addAllVariables(envVarsList))
+               executor = ExecutorInfo
+                 .newBuilder
+                 .setData(ByteString.copyFrom(execData))
+                 .setName(taskId.getValue)
+                 .setExecutorId(ExecutorID.newBuilder().setValue(executorId))
+                 .setCommand(command)
+                 .build()
+               slavesExecutors.put(offer.getSlaveId.getValue, executor)
              }
+             //}
  
              val driverConfiguration = frameworkProvider.getDriverConfiguration