AMATERASU-51: Add support for configuring the path for action level environment path
authorEran Bartenstein <eran.bartenstein@nab.com.au>
Wed, 14 Nov 2018 01:47:13 +0000 (12:47 +1100)
committerEran Bartenstein <eran.bartenstein@nab.com.au>
Wed, 14 Nov 2018 01:47:13 +0000 (12:47 +1100)
Move JobParser from Scala -> Kotlin & into leader-common

leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/dsl/JobParser.kt [new file with mode: 0644]
leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobManager.kt [moved from leader/src/main/kotlin/org/apache/amaterasu/leader/execution/JobManager.kt with 92% similarity]
leader/src/main/scala/org/apache/amaterasu/leader/common/actions/SequentialAction.scala [deleted file]
leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala [deleted file]
leader/src/main/scala/org/apache/amaterasu/leader/execution/JobLoader.scala
leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala
leader/src/test/scala/org/apache/amaterasu/common/execution/JobExecutionTests.scala
leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala
leader/src/test/scala/org/apache/amaterasu/common/execution/JobRestoreTests.scala

diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/dsl/JobParser.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/dsl/JobParser.kt
new file mode 100644 (file)
index 0000000..9541b77
--- /dev/null
@@ -0,0 +1,146 @@
+package org.apache.amaterasu.leader.common.dsl
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.databind.node.ArrayNode
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.leader.common.execution.actions.Action
+import org.apache.amaterasu.leader.common.execution.actions.ErrorAction
+import org.apache.amaterasu.leader.common.execution.actions.SequentialAction
+import org.apache.amaterasu.leader.common.execution.JobManager
+import org.apache.curator.framework.CuratorFramework
+import java.io.File
+import java.util.concurrent.BlockingQueue
+
+/**
+ * Created by Eran Bartenstein on 11/11/18.
+ */
+class JobParser {
+    fun loadMakiFile(): String = File("repo/maki.yml").readText(Charsets.UTF_8)
+
+    /**
+     * Parses the maki.yml string and creates a job manager
+     *
+     * @param jobId
+     * @param maki a string containing the YAML definition of the job
+     * @param actionsQueue
+     * @param client
+     * @return
+     */
+    fun parse(jobId: String,
+              maki: String,
+              actionsQueue: BlockingQueue<ActionData>,
+              client: CuratorFramework,
+              attempts: Int): JobManager {
+
+        val mapper = ObjectMapper(YAMLFactory())
+
+        val job = mapper.readTree(maki)
+
+        // loading the job details
+        val manager = JobManager(jobId, job.path("job-name").asText(), actionsQueue, client)
+
+        // iterating the flow list and constructing the job's flow
+        val actions = (job.path("flow") as ArrayNode).toList()
+
+        parseActions(actions, manager, actionsQueue, attempts, null)
+
+        return manager
+    }
+
+    fun parseActions(actions: List<JsonNode>,
+                     manager: JobManager,
+                     actionsQueue: BlockingQueue<ActionData>,
+                     attempts: Int,
+                     previous: Action?) {
+
+        if (actions.isEmpty())
+            return
+
+        val actionData = actions.first()
+
+        val action = parseSequentialAction(
+                actionData,
+                manager.jobId,
+                actionsQueue,
+                manager.client,
+                attempts
+        )
+
+        //updating the list of frameworks setup
+        manager.frameworks.getOrPut(action.data.groupId){HashSet()}
+                .add(action.data.typeId)
+
+
+        if (manager.head == null) {
+            manager.head = action
+        }
+
+        if (previous != null) {
+            ArrayList(previous.data.nextActionIds).add(action.actionId)
+        }
+        manager.registerAction(action)
+
+        val errorNode = actionData.path("error")
+
+        if (!errorNode.isMissingNode) {
+
+            val errorAction = parseErrorAction(
+                    errorNode,
+                    manager.jobId,
+                    action.data.id,
+                    actionsQueue,
+                    manager.client
+            )
+
+            action.data.errorActionId = errorAction.data.id
+            manager.registerAction(errorAction)
+
+            //updating the list of frameworks setup
+            manager.frameworks.getOrPut(errorAction.data.groupId){HashSet()}
+                    .add(errorAction.data.typeId)
+        }
+
+        parseActions(actions.drop(1), manager, actionsQueue, attempts, action)
+
+    }
+
+    fun parseSequentialAction(action: JsonNode,
+    jobId: String,
+    actionsQueue: BlockingQueue<ActionData>,
+    client: CuratorFramework,
+    attempts: Int): SequentialAction {
+
+        return  SequentialAction(action.path("name").asText(),
+                action.path("file").asText(),
+                action.path("runner").path("group").asText(),
+                action.path("runner").path("type").asText(),
+                action.path("exports").fields().asSequence().map { it.key to it.value.asText() }.toMap(),
+        jobId,
+        actionsQueue,
+        client,
+        attempts)
+
+    }
+
+    fun parseErrorAction(action: JsonNode,
+                         jobId: String,
+                         parent: String,
+                         actionsQueue: BlockingQueue<ActionData>,
+                         client: CuratorFramework): ErrorAction {
+
+        return ErrorAction(
+                action.path("name").asText(),
+                action.path("file").asText(),
+                parent,
+                action.path("group").asText(),
+                action.path("type").asText(),
+                jobId,
+                actionsQueue,
+                client
+        )
+
+    }
+
+}
@@ -1,4 +1,4 @@
-package org.apache.amaterasu.leader.execution
+package org.apache.amaterasu.leader.common.execution
 
 import org.apache.amaterasu.common.configuration.enums.ActionStatus
 import org.apache.amaterasu.common.dataobjects.ActionData
@@ -10,16 +10,18 @@ import java.util.concurrent.BlockingQueue
 /**
  * Created by Eran Bartenstein on 10/11/18.
  */
-class JobManager : KLogging() {
-    var name: String = ""
-    var jobId: String = ""
-    lateinit var client: CuratorFramework
+data class JobManager(var name: String = "",
+                 var jobId: String = "",
+                 var executionQueue: BlockingQueue<ActionData>,
+                 var client: CuratorFramework
+                 ) : KLogging() {
+
+
     lateinit var head: Action
 
     // TODO: this is not private due to tests, fix this!!!
     val registeredActions = HashMap<String, Action>()
     val frameworks = HashMap<String, HashSet<String>>()
-    private lateinit var executionQueue: BlockingQueue<ActionData>
 
     /**
      * The start method initiates the job execution by executing the first action.
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/common/actions/SequentialAction.scala b/leader/src/main/scala/org/apache/amaterasu/leader/common/actions/SequentialAction.scala
deleted file mode 100755 (executable)
index d9be4dd..0000000
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.common.actions
-
-import java.util
-import java.util.concurrent.BlockingQueue
-
-import org.apache.amaterasu.common.configuration.enums.ActionStatus
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.leader.common.execution.actions.Action
-import org.apache.curator.framework.CuratorFramework
-import org.apache.zookeeper.CreateMode
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
-
-class SequentialAction extends Action {
-
-  var jobId: String = _
-  var jobsQueue: BlockingQueue[ActionData] = _
-  var attempts: Int = 2
-  var attempt: Int = 1
-
-  def execute(): Unit = {
-
-    try {
-
-      announceQueued
-      jobsQueue.add(data)
-
-    }
-    catch {
-
-      //TODO: this will not invoke the error action
-      case e: Exception => handleFailure(e.getMessage)
-
-    }
-
-  }
-
-  override def handleFailure(message: String): String = {
-
-    println(s"Part ${data.getName} of group ${data.getGroupId} and of type ${data.getTypeId} failed on attempt $attempt with message: $message")
-    attempt += 1
-
-    if (attempt <= attempts) {
-      data.getId
-    }
-    else {
-      announceFailure()
-      println(s"===> moving to err action ${data.errorActionId}")
-      data.setStatus ( ActionStatus.failed )
-      data.errorActionId
-    }
-
-  }
-
-}
-
-object SequentialAction {
-
-  def apply(name: String,
-            src: String,
-            groupId: String,
-            typeId: String,
-            exports: Map[String, String],
-            jobId: String,
-            queue: BlockingQueue[ActionData],
-            zkClient: CuratorFramework,
-            attempts: Int): SequentialAction = {
-
-    val action = new SequentialAction()
-
-    action.jobsQueue = queue
-
-    // creating a znode for the action
-    action.client = zkClient
-    action.actionPath = action.client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(s"/$jobId/task-", ActionStatus.pending.toString.getBytes())
-    action.actionId = action.actionPath.substring(action.actionPath.indexOf("task-") + 5)
-
-    action.attempts = attempts
-    action.jobId = jobId
-    val javaExports = exports.asJava
-    action.data = new ActionData(ActionStatus.pending, name, src, groupId, typeId, action.actionId, javaExports, new util.ArrayList[String]())
-    action.jobsQueue = queue
-    action.client = zkClient
-
-    action
-  }
-
-}
-
-object ErrorAction {
-
-  def apply(name: String,
-            src: String,
-            parent: String,
-            groupId: String,
-            typeId: String,
-            jobId: String,
-            queue: BlockingQueue[ActionData],
-            zkClient: CuratorFramework): SequentialAction = {
-
-    val action = new SequentialAction()
-
-    action.jobsQueue = queue
-
-    // creating a znode for the action
-    action.client = zkClient
-    action.actionPath = action.client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId/task-$parent-error", ActionStatus.pending.toString.getBytes())
-    action.actionId = action.actionPath.substring(action.actionPath.indexOf('-') + 1).replace("/", "-")
-
-    action.jobId = jobId
-    action.data = new ActionData(ActionStatus.pending, name, src, groupId, typeId, action.actionId, new util.HashMap[String, String](), new util.ArrayList[String]())
-    action.jobsQueue = queue
-    action.client = zkClient
-
-    action
-
-  }
-}
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala b/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala
deleted file mode 100755 (executable)
index e08489c..0000000
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.dsl
-
-import java.util.concurrent.BlockingQueue
-
-import com.fasterxml.jackson.databind.node.ArrayNode
-import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.leader.execution.JobManager
-import org.apache.amaterasu.leader.common.actions.{ErrorAction, SequentialAction}
-import org.apache.amaterasu.leader.common.execution.actions.Action
-import org.apache.curator.framework.CuratorFramework
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.io.Source
-
-/**
-  * The JobParser class is in charge of parsing the maki.yaml file which
-  * describes the workflow of an amaterasu job
-  */
-object JobParser {
-
-  def loadMakiFile(): String = {
-
-    Source.fromFile("repo/maki.yml").mkString
-
-  }
-
-  /**
-    * Parses the maki.yml string and creates a job manager
-    *
-    * @param jobId
-    * @param maki a string containing the YAML definition of the job
-    * @param actionsQueue
-    * @param client
-    * @return
-    */
-  def parse(jobId: String,
-            maki: String,
-            actionsQueue: BlockingQueue[ActionData],
-            client: CuratorFramework,
-            attempts: Int): JobManager = {
-
-    val mapper = new ObjectMapper(new YAMLFactory())
-
-    val job = mapper.readTree(maki)
-
-    // loading the job details
-    val manager = JobManager(jobId, job.path("job-name").asText, actionsQueue, client)
-
-    // iterating the flow list and constructing the job's flow
-    val actions = job.path("flow").asInstanceOf[ArrayNode].asScala.toSeq
-
-    parseActions(actions, manager, actionsQueue, attempts, null)
-
-    manager
-  }
-
-  /**
-    * parseActions is a recursive function, for building the workflow of
-    * the job
-    * God, I miss Clojure
-    *
-    * @param actions  a seq containing the definitions of all the actions
-    * @param manager  the job manager for the job
-    * @param actionsQueue
-    * @param previous the previous action, this is used in order to add the current action
-    *                 to the nextActionIds
-    */
-  def parseActions(actions: Seq[JsonNode],
-                   manager: JobManager,
-                   actionsQueue: BlockingQueue[ActionData],
-                   attempts: Int,
-                   previous: Action): Unit = {
-
-    if (actions.isEmpty)
-      return
-
-    val actionData = actions.head
-
-    val action = parseSequentialAction(
-      actionData,
-      manager.jobId,
-      actionsQueue,
-      manager.client,
-      attempts
-    )
-
-    //updating the list of frameworks setup
-    manager.frameworks.getOrElseUpdate(action.data.getGroupId,
-                                       new mutable.HashSet[String]())
-                                       .add(action.data.getTypeId)
-
-
-    if (manager.head == null) {
-      manager.head = action
-    }
-
-    if (previous != null) {
-      previous.data.getNextActionIds.add(action.actionId)
-    }
-    manager.registerAction(action)
-
-    val errorNode = actionData.path("error")
-
-    if (!errorNode.isMissingNode) {
-
-      val errorAction = parseErrorAction(
-        errorNode,
-        manager.jobId,
-        action.data.getId,
-        actionsQueue,
-        manager.client
-      )
-
-      action.data.errorActionId = errorAction.data.getId
-      manager.registerAction(errorAction)
-
-      //updating the list of frameworks setup
-      manager.frameworks.getOrElseUpdate(errorAction.data.getGroupId,
-        new mutable.HashSet[String]())
-        .add(errorAction.data.getTypeId)
-    }
-
-    parseActions(actions.tail, manager, actionsQueue, attempts, action)
-
-  }
-
-  def parseSequentialAction(action: JsonNode,
-                            jobId: String,
-                            actionsQueue: BlockingQueue[ActionData],
-                            client: CuratorFramework,
-                            attempts: Int): SequentialAction = {
-
-    SequentialAction(action.path("name").asText,
-      action.path("file").asText,
-      action.path("runner").path("group").asText,
-      action.path("runner").path("type").asText,
-      action.path("exports").fields().asScala.toSeq.map(e => (e.getKey, e.getValue.asText())).toMap,
-      jobId,
-      actionsQueue,
-      client,
-      attempts)
-  }
-
-  def parseErrorAction(action: JsonNode,
-                       jobId: String,
-                       parent: String,
-                       actionsQueue: BlockingQueue[ActionData],
-                       client: CuratorFramework): SequentialAction = {
-
-    ErrorAction(
-      action.path("name").asText,
-      action.path("file").asText,
-      parent,
-      action.path("group").asText,
-      action.path("type").asText,
-      jobId,
-      actionsQueue,
-      client
-    )
-
-  }
-}
index 234070d..49df009 100755 (executable)
@@ -21,8 +21,8 @@ import java.util.concurrent.BlockingQueue
 import org.apache.amaterasu.common.configuration.enums.ActionStatus
 import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.leader.common.dsl.GitUtil
-import org.apache.amaterasu.leader.dsl.JobParser
+import org.apache.amaterasu.leader.common.dsl.{GitUtil, JobParser}
+import org.apache.amaterasu.leader.common.execution.JobManager
 import org.apache.curator.framework.CuratorFramework
 import org.apache.zookeeper.CreateMode
 
@@ -102,4 +102,4 @@ object JobLoader extends Logging {
 
   }
 
-}
\ No newline at end of file
+}
index bcd7923..9c25ece 100755 (executable)
@@ -34,9 +34,10 @@ import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.common.execution.actions.NotificationLevel.NotificationLevel
 import org.apache.amaterasu.common.execution.actions.{Notification, NotificationLevel, NotificationType}
 import org.apache.amaterasu.leader.common.configuration.ConfigManager
+import org.apache.amaterasu.leader.common.execution.JobManager
 import org.apache.amaterasu.leader.common.utilities.DataLoader
 import org.apache.amaterasu.leader.execution.frameworks.FrameworkProvidersFactory
-import org.apache.amaterasu.leader.execution.{JobLoader, JobManager}
+import org.apache.amaterasu.leader.execution.JobLoader
 import org.apache.amaterasu.leader.utilities.HttpServer
 import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
 import org.apache.curator.retry.ExponentialBackoffRetry
index 23700f8..ff9029b 100644 (file)
@@ -21,15 +21,16 @@ import java.net.{InetAddress, ServerSocket}
 import java.nio.ByteBuffer
 import java.util
 import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
-import javax.jms.Session
 
+import javax.jms.Session
 import org.apache.activemq.ActiveMQConnectionFactory
 import org.apache.activemq.broker.BrokerService
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.common.logging.Logging
+import org.apache.amaterasu.leader.common.execution.JobManager
 import org.apache.amaterasu.leader.execution.frameworks.FrameworkProvidersFactory
-import org.apache.amaterasu.leader.execution.{JobLoader, JobManager}
+import org.apache.amaterasu.leader.execution.JobLoader
 import org.apache.amaterasu.leader.utilities.{ActiveReportListener, Args}
 import org.apache.curator.framework.recipes.barriers.DistributedBarrier
 import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
index 379dd1b..4b8ba3d 100644 (file)
@@ -23,8 +23,8 @@ import java.util.concurrent.ConcurrentHashMap
 import com.google.gson.Gson
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.logging.Logging
+import org.apache.amaterasu.leader.common.execution.JobManager
 import org.apache.amaterasu.leader.common.utilities.DataLoader
-import org.apache.amaterasu.leader.execution.JobManager
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.async.{AMRMClientAsync, NMClientAsync}
 import org.apache.hadoop.yarn.util.Records
index ef47cc1..e119cbc 100755 (executable)
@@ -19,7 +19,7 @@ package org.apache.amaterasu.common.execution
 import java.util.concurrent.LinkedBlockingQueue
 
 import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.leader.dsl.JobParser
+import org.apache.amaterasu.leader.common.dsl.JobParser
 import org.apache.curator.framework.CuratorFrameworkFactory
 import org.apache.curator.retry.ExponentialBackoffRetry
 import org.apache.curator.test.TestingServer
index 13685f9..d92d8c8 100755 (executable)
@@ -19,7 +19,7 @@ package org.apache.amaterasu.common.execution
 import java.util.concurrent.LinkedBlockingQueue
 
 import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.leader.dsl.JobParser
+import org.apache.amaterasu.leader.common.dsl.JobParser
 import org.apache.curator.framework.CuratorFrameworkFactory
 import org.apache.curator.retry.ExponentialBackoffRetry
 import org.apache.curator.test.TestingServer
@@ -61,4 +61,4 @@ class JobParserTests extends FlatSpec with Matchers {
 
   }
 
-}
\ No newline at end of file
+}
index 64887ab..adee3d8 100755 (executable)
@@ -20,7 +20,8 @@ import java.util.concurrent.LinkedBlockingQueue
 
 import org.apache.amaterasu.common.configuration.enums.ActionStatus
 import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.leader.execution.{JobLoader, JobManager}
+import org.apache.amaterasu.leader.common.execution.JobManager
+import org.apache.amaterasu.leader.execution.JobLoader
 import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
 import org.apache.curator.retry.ExponentialBackoffRetry
 import org.apache.curator.test.TestingServer
@@ -80,4 +81,4 @@ class JobRestoreTests extends FlatSpec with Matchers with BeforeAndAfterEach {
 
     queue.peek.getName should be("start")
   }
-}
\ No newline at end of file
+}