fixed mesos classpath
authorYaniv Rodenski <roadan@gmail.com>
Sat, 28 Jul 2018 06:36:57 +0000 (16:36 +1000)
committerYaniv Rodenski <roadan@gmail.com>
Sat, 28 Jul 2018 06:36:57 +0000 (16:36 +1000)
frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
leader/src/main/scripts/ama-start-mesos.sh
sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.java

index 06362ee..04280ce 100644 (file)
@@ -12,7 +12,7 @@ class PySparkRunnerProvider extends RunnerSetupProvider {
 
   private var conf: ClusterConfig = _
 
-  override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String): String = conf.mode match {
+  override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = conf.mode match {
     case "mesos" => s"""env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${conf.Webserver.Port}/dist/spark-${conf.Webserver.sparkVersion}.tgz java -cp executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.Webserver.sparkVersion}/jars/* -Dscala.usejavacp=true -Djava.library.path=/usr/lib org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId ${conf.master} ${actionData.name}""".stripMargin
     case "yarn" => "/bin/bash ./miniconda.sh -b -p $PWD/miniconda && " +
       s"/bin/bash spark/bin/load-spark-env.sh && " +
@@ -21,7 +21,7 @@ class PySparkRunnerProvider extends RunnerSetupProvider {
       "-Dscala.usejavacp=true " +
       "-Dhdp.version=2.6.1.0-129 " +
       "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " +
-      s"'$jobId' '${conf.master}' '${actionData.name}' '${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}' '${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}' '$executorId'" +
+      s"'$jobId' '${conf.master}' '${actionData.name}' '${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}' '${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}' '$executorId' '$callbackAddress' " +
       s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " +
       s"2> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr "
     case _ => ""
index bd7dee1..dcc75e6 100644 (file)
@@ -28,7 +28,7 @@ class SparkScalaRunnerProvider extends RunnerSetupProvider {
 
   private var conf: ClusterConfig = _
 
-  override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String): String = conf.mode match {
+  override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = conf.mode match {
     case "mesos" => s"""env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${conf.Webserver.Port}/dist/spark-${conf.Webserver.sparkVersion}.tgz java -cp executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.Webserver.sparkVersion}/jars/* -Dscala.usejavacp=true -Djava.library.path=/usr/lib org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId ${conf.master} ${actionData.name}""".stripMargin
     case "yarn" => s"/bin/bash spark/bin/load-spark-env.sh && " +
       s"java -cp spark/jars/*:executor.jar:spark-runner.jar:spark-runtime.jar:spark/conf/:${conf.YARN.hadoopHomeDir}/conf/ " +
index 407e679..a6c8306 100755 (executable)
@@ -171,9 +171,11 @@ class JobScheduler extends AmaterasuScheduler {
                 val execData = DataLoader.getExecutorDataBytes(env, config)
                 val executorId = taskId.getValue + "-" + UUID.randomUUID()
                 //creating the command
+
+                println(s"===> ${runnerProvider.getCommand(jobManager.jobId, actionData, env, executorId, "")}")
                 val command = CommandInfo
                   .newBuilder
-                  .setValue(runnerProvider.getCommand(jobManager.jobId, actionData, env, executorId))
+                  .setValue(runnerProvider.getCommand(jobManager.jobId, actionData, env, executorId, ""))
                   .addUris(URI.newBuilder
                     .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/executor-${config.version}-all.jar")
                     .setExecutable(false)
index 5c717d3..e28d99f 100644 (file)
@@ -246,7 +246,7 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
         val framework = frameworkFactory.getFramework(actionData.groupId)
         val runnerProvider = framework.getRunnerProvider(actionData.typeId)
         val ctx = Records.newRecord(classOf[ContainerLaunchContext])
-        val commands: List[String] = List(runnerProvider.getCommand(jobManager.jobId, actionData, env, s"${actionData.id}-${container.getId.getContainerId}"))
+        val commands: List[String] = List(runnerProvider.getCommand(jobManager.jobId, actionData, env, s"${actionData.id}-${container.getId.getContainerId}", address))
 
         log.info("Running container id {}.", container.getId.getContainerId)
         log.info("Running container id {} with command '{}'", container.getId.getContainerId, commands.last)
@@ -279,7 +279,6 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
           "spark-version-info.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/spark-version-info.properties"))),
           "spark_intp.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/spark_intp.py"))))
 
-
         //adding the framework and executor resources
         setupResources(yarnJarPath, framework.getGroupIdentifier, resources, framework.getGroupIdentifier)
         setupResources(yarnJarPath, s"${framework.getGroupIdentifier}/${actionData.typeId}", resources, s"${framework.getGroupIdentifier}-${actionData.typeId}")
index 4a1f164..0047b38 100755 (executable)
@@ -94,8 +94,13 @@ case $i in
 esac
 done
 
+CP=""
+for filename in $BASEDIR/bin/*; do
+    CP+=$filename":"
+done
+
 echo "repo: ${REPO} "
-CMD="java -cp ${BASEDIR}/bin/*-all.jar -Djava.library.path=/usr/lib org.apache.amaterasu.leader.mesos.MesosJobLauncher --home ${BASEDIR}"
+CMD="java -cp ${CP} -Djava.library.path=/usr/lib org.apache.amaterasu.leader.mesos.MesosJobLauncher --home ${BASEDIR}"
 
 if [ -n "$REPO" ]; then
     CMD+=" --repo ${REPO}"
@@ -124,7 +129,7 @@ fi
 if ! ls ${BASEDIR}/dist/spark*.tgz 1> /dev/null 2>&1; then
     echo "${bold} Fetching spark distributable ${NC}"
     #wget https://d3kbcqa49mib13.cloudfront.net/spark-2.2.1-bin-hadoop2.7.tgz -P ${BASEDIR}/dist
-    wget http://apache.mirror.digitalpacific.com.au/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz -P ${BASEDIR}/dist
+    wget https://archive.apache.org/dist/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz -P ${BASEDIR}/dist
 fi
 if [ ! -f ${BASEDIR}/dist/miniconda.sh ]; then
     echo "${bold}Fetching miniconda distributable ${NC}"
index 875c3d2..fe4086d 100644 (file)
@@ -20,7 +20,7 @@ import org.apache.amaterasu.common.dataobjects.ActionData;
 
 public interface RunnerSetupProvider {
 
-    String getCommand(String jobId, ActionData actionData, String env, String executorId);
+    String getCommand(String jobId, ActionData actionData, String env, String executorId, String callbackAddress);
 
     String[] getRunnerResources();