fixed config parsing
authorYaniv Rodenski <roadan@gmail.com>
Sun, 25 Nov 2018 04:51:20 +0000 (15:51 +1100)
committerYaniv Rodenski <roadan@gmail.com>
Sun, 25 Nov 2018 04:51:20 +0000 (15:51 +1100)
leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/dsl/JobParser.kt
leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/Action.kt
leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/ErrorAction.kt
leader/src/test/resources/simple-maki.yml
leader/src/test/scala/org/apache/amaterasu/common/execution/ActionStatusTests.scala
leader/src/test/scala/org/apache/amaterasu/common/execution/JobExecutionTests.scala
leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala

index a7514d7..390058b 100644 (file)
@@ -155,8 +155,10 @@ object JobParser {
                 action.path("name").asText(),
                 action.path("file").asText(),
                 parent,
-                action.path("group").asText(),
-                action.path("type").asText(),
+                action.path("config").asText(),
+                action.path("runner").path("group").asText(),
+                action.path("runner").path("type").asText(),
+
                 jobId,
                 actionsQueue,
                 client
index 07360ab..5810065 100644 (file)
@@ -40,20 +40,12 @@ abstract class Action : KLogging() {
 
     fun announceQueued() {
         log.debug("Action ${data.name} of group ${data.groupId} and of type ${data.typeId} is queued for execution")
-        println("**********************")
-        println(actionPath)
-        println("queued")
-        println("**********************")
         client.setData().forPath(actionPath, ActionStatus.queued.value.toByteArray())
         data.status = ActionStatus.queued
     }
 
     fun announceComplete() {
         log.debug("Action ${data.name} of group ${data.groupId} and of type ${data.typeId} complete")
-        println("**********************")
-        println(actionPath)
-        println("complete")
-        println("**********************")
         client.setData().forPath(actionPath, ActionStatus.complete.value.toByteArray())
         data.status = ActionStatus.complete
     }
index 7119510..a9e6de6 100644 (file)
@@ -25,6 +25,7 @@ import java.util.concurrent.BlockingQueue
 class ErrorAction(name: String,
                   src: String,
                   parent: String,
+                  config: String,
                   groupId: String,
                   typeId: String,
                   jobId: String,
@@ -40,7 +41,7 @@ class ErrorAction(name: String,
         actionId = actionPath.substring(actionPath.indexOf('-') + 1).replace("/", "-")
 
         this.jobId = jobId
-        data =  ActionData (ActionStatus.pending, name, src, groupId, typeId, actionId, hashMapOf(), arrayListOf())
+        data = ActionData(ActionStatus.pending, name, src, config, groupId, typeId, actionId)
         jobsQueue = queue
         client = zkClient
     }
index b9c8e60..f640e11 100755 (executable)
@@ -22,16 +22,16 @@ flow:
       group: spark
       type: scala
       src: simple-spark.scala
-      config: start-cfg
+      config: start-cfg.yaml
     - name: step2
       group: spark
       type: scala
       src: file2.scala
-      config: step2-cfg
+      config: step2-cfg.yaml
       error:
         name: error-action
         group: spark
         type: scala
         src: error.scala
-       config: error-cfg
+        config: error-cfg.yaml
 ...
index 7e739dc..806bae9 100755 (executable)
@@ -36,7 +36,7 @@ class ActionStatusTests extends FlatSpec with Matchers {
   val retryPolicy = new ExponentialBackoffRetry(1000, 3)
   val server = new TestingServer(2181, true)
   val jobId = s"job_${System.currentTimeMillis}"
-  val data = new ActionData(ActionStatus.pending, "test_action", "start.scala", "spark","scala", "0000001", new util.HashMap() , List[String]().asJava)
+  val data = new ActionData(ActionStatus.pending, "test_action", "start.scala", "", "spark","scala", "0000001", new util.HashMap() , List[String]().asJava)
 
   "an Action" should "queue it's ActionData int the job queue when executed" in {
 
@@ -47,7 +47,7 @@ class ActionStatusTests extends FlatSpec with Matchers {
     client.start()
 
     client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId")
-    val action = new SequentialAction(data.getName, data.getSrc, data.getGroupId, data.getTypeId, Map.empty[String, String].asJava, jobId, queue, client, 1)
+    val action = new SequentialAction(data.getName, data.getSrc, "", data.getGroupId, data.getTypeId, Map.empty[String, String].asJava, jobId, queue, client, 1)
 
     action.execute()
     queue.peek().getName should be(data.getName)
index bcb667e..469335c 100755 (executable)
@@ -48,6 +48,12 @@ class JobExecutionTests extends FlatSpec with Matchers {
 
   val job = JobParser.parse(jobId, yaml, queue, client, 1)
 
+  queue.toArray.foreach(it => {
+    val d = it.asInstanceOf[ActionData]
+    println(s"+++++++> ${d.getName}")
+    println(s"  +++++++> ${d.getErrorActionId}")
+  })
+
   "a job" should "queue the first action when the JobManager.start method is called " in {
 
     job.start
@@ -82,38 +88,38 @@ class JobExecutionTests extends FlatSpec with Matchers {
 
   }
 
-      "the next step2 job" should "be queued as a result of the completion" in {
+  "the next step2 job" should "be queued as a result of the completion" in {
 
-        queue.peek.getName should be("step2")
+    queue.peek.getName should be("step2")
 
-        // making sure that the status is reflected in zk
-        val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001")
-        new String(actionStatus) should be("queued")
+    // making sure that the status is reflected in zk
+    val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001")
+    new String(actionStatus) should be("queued")
 
-      }
+  }
 
-      it should "be marked as started when JobManager.getNextActionData is called" in {
+  it should "be marked as started when JobManager.getNextActionData is called" in {
 
-        val data = job.getNextActionData
+    val data = job.getNextActionData
 
-        data.getName should be("step2")
+    data.getName should be("step2")
 
-        // making sure that the status is reflected in zk
-        val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001")
-        new String(actionStatus) should be("started")
-      }
+    // making sure that the status is reflected in zk
+    val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001")
+    new String(actionStatus) should be("started")
+  }
 
-        it should "be marked as failed when JobManager. is called" in {
+  it should "be marked as failed when JobManager. is called" in {
 
-          job.actionFailed("0000000001", "test failure")
-          queue.peek.getName should be ("error-action")
+    job.actionFailed("0000000001", "test failure")
+    queue.peek.getName should be("error-action")
 
-          // making sure that the status is reflected in zk
-          val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001-error")
-          new String(actionStatus) should be("queued")
+    // making sure that the status is reflected in zk
+    val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001-error")
+    new String(actionStatus) should be("queued")
 
-          // and returned by getNextActionData
-          val data = job.getNextActionData
+    // and returned by getNextActionData
+    val data = job.getNextActionData
 
-        }
+  }
 }
index f04eeb7..00ca300 100755 (executable)
@@ -65,9 +65,9 @@ class JobParserTests extends FlatSpec with Matchers {
 
     job.getRegisteredActions.size should be(3)
 
-    job.getRegisteredActions.get("0000000000").data.getConfig should be("start-cfg")
-    job.getRegisteredActions.get("0000000001").data.getConfig should be("step2-cfg")
-    job.getRegisteredActions.get("0000000001-error").data.getConfig should be("error-cfg")
+    job.getRegisteredActions.get("0000000000").data.getConfig should be("start-cfg.yaml")
+    job.getRegisteredActions.get("0000000001").data.getConfig should be("step2-cfg.yaml")
+    job.getRegisteredActions.get("0000000001-error").data.getConfig should be("error-cfg.yaml")
 
   }