AMATERASU-51: Add support for configuring the path for action level environment path
authorEran Bartenstein <eran.bartenstein@nab.com.au>
Wed, 31 Oct 2018 02:37:34 +0000 (13:37 +1100)
committerEran Bartenstein <eran.bartenstein@nab.com.au>
Wed, 31 Oct 2018 02:37:34 +0000 (13:37 +1100)
Resolve further compilation issues with scala to Kotlin conversion
CODE still does NOT compile.

leader/build.gradle
leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala
leader/src/main/scala/org/apache/amaterasu/leader/execution/actions/SequentialAction.scala
leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala

index 0450748..25f3cbc 100644 (file)
@@ -17,7 +17,7 @@
 plugins {
     id "com.github.johnrengelman.shadow" version "1.2.4"
     id 'com.github.maiflai.scalatest' version '0.22'
-    id 'kotlin'
+    id 'org.jetbrains.kotlin.jvm'
     id 'scala'
     id 'java'
 
@@ -63,6 +63,8 @@ dependencies {
     compile group: 'org.reflections', name: 'reflections', version: '0.9.11'
     compile group: 'org.apache.activemq', name: 'activemq-broker', version: '5.15.3'
     compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0'
+    compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
+    compile "org.jetbrains.kotlin:kotlin-reflect"
     runtime group: 'org.apache.activemq', name: 'activemq-kahadb-store', version: '5.15.3'
 
     testCompile project(':common')
@@ -83,8 +85,11 @@ sourceSets {
 
     // this is done so Scala will compile before Java
     main {
+        kotlin {
+            srcDirs = ['src/main/kotlin']
+        }
         scala {
-            srcDirs = ['src/main/kotlin', 'src/main/scala', 'src/main/java']
+            srcDirs = ['src/main/kotlin','src/main/java', 'src/main/scala']
         }
         java {
             srcDirs = []
@@ -110,4 +115,11 @@ task copyToHomeBin(type: Copy) {
 task copyToHome() {
     dependsOn copyToHomeRoot
     dependsOn copyToHomeBin
-}
\ No newline at end of file
+}
+
+compileKotlin{
+    kotlinOptions.jvmTarget = "1.8"
+}
+compileTestKotlin {
+    kotlinOptions.jvmTarget = "1.8"
+}
index 7a0c605..b368a83 100755 (executable)
@@ -131,7 +131,7 @@ object JobParser {
         manager.client
       )
 
-      action.data.errorActionId = errorAction.data.id
+      action.data.errorActionId = errorAction.data.getId
       manager.registerAction(errorAction)
 
       //updating the list of frameworks setup
index 5d7f41c..ea9b3ba 100755 (executable)
@@ -16,6 +16,7 @@
  */
 package org.apache.amaterasu.leader.execution.actions
 
+import java.util
 import java.util.concurrent.BlockingQueue
 
 import org.apache.amaterasu.common.configuration.enums.ActionStatus
@@ -23,6 +24,7 @@ import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.curator.framework.CuratorFramework
 import org.apache.zookeeper.CreateMode
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
 class SequentialAction extends Action {
@@ -55,7 +57,7 @@ class SequentialAction extends Action {
     attempt += 1
 
     if (attempt <= attempts) {
-      data.id
+      data.getId
     }
     else {
       announceFailure()
@@ -91,7 +93,8 @@ object SequentialAction {
 
     action.attempts = attempts
     action.jobId = jobId
-    action.data = ActionData(ActionStatus.pending, name, src, groupId, typeId, action.actionId, exports, new ListBuffer[String])
+    val javaExports = exports.asJava
+    action.data = new ActionData(ActionStatus.pending, name, src, groupId, typeId, action.actionId, javaExports, new util.ArrayList[String]())
     action.jobsQueue = queue
     action.client = zkClient
 
@@ -121,7 +124,7 @@ object ErrorAction {
     action.actionId = action.actionPath.substring(action.actionPath.indexOf('-') + 1).replace("/", "-")
 
     action.jobId = jobId
-    action.data = ActionData(ActionStatus.pending, name, src, groupId, typeId, action.actionId, Map.empty, new ListBuffer[String])
+    action.data = new ActionData(ActionStatus.pending, name, src, groupId, typeId, action.actionId, new util.HashMap[String, String](), new util.ArrayList[String]())
     action.jobsQueue = queue
     action.client = zkClient
 
index d68ae77..ac2a188 100755 (executable)
@@ -27,7 +27,6 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.configuration.enums.ActionStatus
-import org.apache.amaterasu.common.configuration.enums.ActionStatus.ActionStatus
 import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.common.execution.actions.NotificationLevel.NotificationLevel
 import org.apache.amaterasu.common.execution.actions.{Notification, NotificationLevel, NotificationType}
@@ -152,19 +151,19 @@ class JobScheduler extends AmaterasuScheduler {
         try {
           val actionData = jobManager.getNextActionData
           if (actionData != null) {
-            val taskId = Protos.TaskID.newBuilder().setValue(actionData.id).build()
+            val taskId = Protos.TaskID.newBuilder().setValue(actionData.getId).build()
 
             // setting up the configuration files for the container
-            val envYaml = configManager.getActionConfigContent(actionData.name, "") //TODO: replace with the value in actionData.config
-            writeConfigFile(envYaml, jobManager.jobId, actionData.name, "env.yaml")
+            val envYaml = configManager.getActionConfigContent(actionData.getName, "") //TODO: replace with the value in actionData.config
+            writeConfigFile(envYaml, jobManager.jobId, actionData.getName, "env.yaml")
 
             val dataStores = DataLoader.getTaskData(actionData, env).exports
             val writer = new StringWriter()
             yamlMapper.writeValue(writer, dataStores)
             val dataStoresYaml = writer.toString
-            writeConfigFile(dataStoresYaml, jobManager.jobId, actionData.name, "datastores.yaml")
+            writeConfigFile(dataStoresYaml, jobManager.jobId, actionData.getName, "datastores.yaml")
 
-            writeConfigFile(s"jobId: ${jobManager.jobId}\nactionName: ${actionData.name}", jobManager.jobId, actionData.name, "runtime.yaml")
+            writeConfigFile(s"jobId: ${jobManager.jobId}\nactionName: ${actionData.getName}", jobManager.jobId, actionData.getName, "runtime.yaml")
 
             offersToTaskIds.put(offer.getId.getValue, taskId.getValue)
 
@@ -176,8 +175,8 @@ class JobScheduler extends AmaterasuScheduler {
             slaveActions.put(taskId.getValue, ActionStatus.started)
 
 
-            val frameworkProvider = frameworkFactory.providers(actionData.groupId)
-            val runnerProvider = frameworkProvider.getRunnerProvider(actionData.typeId)
+            val frameworkProvider = frameworkFactory.providers(actionData.getGroupId)
+            val runnerProvider = frameworkProvider.getRunnerProvider(actionData.getTypeId)
 
             // searching for an executor that already exist on the slave, if non exist
             // we create a new one
@@ -205,21 +204,21 @@ class JobScheduler extends AmaterasuScheduler {
 
                 // Getting env.yaml
                 command.addUris(URI.newBuilder
-                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/env.yaml")
+                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.getName}/env.yaml")
                   .setExecutable(false)
                   .setExtract(true)
                   .build())
 
                 // Getting datastores.yaml
                 command.addUris(URI.newBuilder
-                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/datastores.yaml")
+                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.getName}/datastores.yaml")
                   .setExecutable(false)
                   .setExtract(true)
                   .build())
 
                 // Getting runtime.yaml
                 command.addUris(URI.newBuilder
-                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/runtime.yaml")
+                  .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.getName}/runtime.yaml")
                   .setExecutable(false)
                   .setExtract(true)
                   .build())
index 8c11cc2..2870ef7 100644 (file)
@@ -170,7 +170,7 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
       val actionData = jobManager.getNextActionData
       if (actionData != null) {
 
-        val frameworkProvider = frameworkFactory.providers(actionData.groupId)
+        val frameworkProvider = frameworkFactory.providers(actionData.getGroupId)
         val driverConfiguration = frameworkProvider.getDriverConfiguration
 
         var mem: Int = driverConfiguration.getMemory
@@ -221,14 +221,14 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
   private def askContainer(actionData: ActionData): Unit = {
 
     actionsBuffer.add(actionData)
-    log.info(s"About to ask container for action ${actionData.id}. Action buffer size is: ${actionsBuffer.size()}")
+    log.info(s"About to ask container for action ${actionData.getId}. Action buffer size is: ${actionsBuffer.size()}")
 
     // we have an action to schedule, let's request a container
     val priority: Priority = Records.newRecord(classOf[Priority])
     priority.setPriority(1)
     val containerReq = new ContainerRequest(capability, null, null, priority)
     rmClient.addContainerRequest(containerReq)
-    log.info(s"Asked container for action ${actionData.id}")
+    log.info(s"Asked container for action ${actionData.getId}")
 
   }
 
@@ -245,10 +245,10 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
       val containerTask = Future[ActionData] {
 
         val frameworkFactory = FrameworkProvidersFactory(env, config)
-        val framework = frameworkFactory.getFramework(actionData.groupId)
-        val runnerProvider = framework.getRunnerProvider(actionData.typeId)
+        val framework = frameworkFactory.getFramework(actionData.getGroupId)
+        val runnerProvider = framework.getRunnerProvider(actionData.getTypeId)
         val ctx = Records.newRecord(classOf[ContainerLaunchContext])
-        val commands: List[String] = List(runnerProvider.getCommand(jobManager.jobId, actionData, env, s"${actionData.id}-${container.getId.getContainerId}", address))
+        val commands: List[String] = List(runnerProvider.getCommand(jobManager.jobId, actionData, env, s"${actionData.getId}-${container.getId.getContainerId}", address))
 
         log.info("Running container id {}.", container.getId.getContainerId)
         log.info("Running container id {} with command '{}'", container.getId.getContainerId, commands.last)
@@ -283,7 +283,7 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
 
         //adding the framework and executor resources
         setupResources(yarnJarPath, framework.getGroupIdentifier, resources, framework.getGroupIdentifier)
-        setupResources(yarnJarPath, s"${framework.getGroupIdentifier}/${actionData.typeId}", resources, s"${framework.getGroupIdentifier}-${actionData.typeId}")
+        setupResources(yarnJarPath, s"${framework.getGroupIdentifier}/${actionData.getTypeId}", resources, s"${framework.getGroupIdentifier}-${actionData.getTypeId}")
 
         ctx.setLocalResources(resources)
 
@@ -305,9 +305,9 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
           askContainer(actionData)
 
         case Success(requestedActionData) =>
-          jobManager.actionStarted(requestedActionData.id)
+          jobManager.actionStarted(requestedActionData.getId)
           containersIdsToTask.put(container.getId.getContainerId, requestedActionData)
-          log.info(s"launching container succeeded: ${container.getId.getContainerId}; task: ${requestedActionData.id}")
+          log.info(s"launching container succeeded: ${container.getId.getContainerId}; task: ${requestedActionData.getId}")
 
       }
     }
@@ -371,15 +371,16 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
         val task = containersIdsToTask(containerId)
         rmClient.releaseAssignedContainer(status.getContainerId)
 
+        val taskId = task.getId
         if (status.getExitStatus == 0) {
 
           //completedContainersAndTaskIds.put(containerId, task.id)
-          jobManager.actionComplete(task.id)
-          log.info(s"Container $containerId complete with task ${task.id} with success.")
+          jobManager.actionComplete(taskId)
+          log.info(s"Container $containerId complete with task ${taskId} with success.")
         } else {
           // TODO: Check the getDiagnostics value and see if appropriate
-          jobManager.actionFailed(task.id, status.getDiagnostics)
-          log.warn(s"Container $containerId complete with task ${task.id} with failed status code (${status.getExitStatus})")
+          jobManager.actionFailed(taskId, status.getDiagnostics)
+          log.warn(s"Container $containerId complete with task ${taskId} with failed status code (${status.getExitStatus})")
         }
       }
     }
index 8bc9dd7..17b3a68 100644 (file)
@@ -108,7 +108,7 @@ class YarnRMCallbackHandler(nmClient: NMClientAsync,
                          | java -cp executor.jar:spark-${config.Webserver.sparkVersion}/lib/*
                          | -Dscala.usejavacp=true
                          | -Djava.library.path=/usr/lib org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher
-                         | ${jobManager.jobId} ${config.master} ${actionData.name} ${gson.toJson(taskData)} ${gson.toJson(execData)}""".stripMargin
+                         | ${jobManager.jobId} ${config.master} ${actionData.getName} ${gson.toJson(taskData)} ${gson.toJson(execData)}""".stripMargin
         ctx.setCommands(Collections.singletonList(command))
 
         ctx.setLocalResources(Map[String, LocalResource] (
@@ -116,7 +116,7 @@ class YarnRMCallbackHandler(nmClient: NMClientAsync,
         ))
 
         nmClient.startContainerAsync(container, ctx)
-        actionData.id
+        actionData.getId
       }
 
       containerTask onComplete {