tests are now fixed
authorYaniv Rodenski <roadan@gmail.com>
Sun, 25 Nov 2018 04:21:21 +0000 (15:21 +1100)
committerYaniv Rodenski <roadan@gmail.com>
Sun, 25 Nov 2018 04:21:21 +0000 (15:21 +1100)
common/build.gradle
common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/ActionData.kt
leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/dsl/JobParser.kt
leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobManager.kt
leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/Action.kt
leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/SequentialActionBase.kt
leader/build.gradle
leader/src/test/scala/org/apache/amaterasu/common/execution/ActionStatusTests.scala
leader/src/test/scala/org/apache/amaterasu/common/execution/JobExecutionTests.scala
leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala
leader/src/test/scala/org/apache/amaterasu/common/execution/JobRestoreTests.scala

index 9a456ce..4864f62 100644 (file)
@@ -99,9 +99,14 @@ compileKotlin{
     kotlinOptions.jvmTarget = "1.8"
 }
 compileTestKotlin {
+    dependsOn compileScala
     kotlinOptions.jvmTarget = "1.8"
 }
 
+compileTestScala {
+    dependsOn compileScala
+}
+
 compileScala {
     dependsOn compileJava
     classpath += files(compileJava.destinationDir) + files(compileKotlin.destinationDir)
index 7e19db2..e1b72dd 100644 (file)
@@ -30,7 +30,9 @@ data class ActionData(var status: ActionStatus = ActionStatus.pending,
                       var typeId: String= "",
                       var id: String= "",
                       var exports: Map<String, String> = mutableMapOf(),
-                      var nextActionIds: List<String> = listOf()) {
+                      var nextActionIds: MutableList<String> = mutableListOf()) {
     lateinit var errorActionId: String
+    val hasErrorAction: Boolean
+        get() = ::errorActionId.isInitialized
 
 }
index 50c178f..740612b 100644 (file)
@@ -21,10 +21,10 @@ import com.fasterxml.jackson.databind.ObjectMapper
 import com.fasterxml.jackson.databind.node.ArrayNode
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
 import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.leader.common.execution.JobManager
 import org.apache.amaterasu.leader.common.execution.actions.Action
 import org.apache.amaterasu.leader.common.execution.actions.ErrorAction
 import org.apache.amaterasu.leader.common.execution.actions.SequentialAction
-import org.apache.amaterasu.leader.common.execution.JobManager
 import org.apache.curator.framework.CuratorFramework
 import java.io.File
 import java.util.concurrent.BlockingQueue
@@ -55,7 +55,7 @@ object JobParser {
         val job = mapper.readTree(maki)
 
         // loading the job details
-        val manager = JobManager(jobId, job.path("job-name").asText(), actionsQueue, client)
+        val manager = JobManager(job.path("job-name").asText(), jobId, actionsQueue, client)
 
         // iterating the flow list and constructing the job's flow
         val actions = (job.path("flow") as ArrayNode).toList()
@@ -72,6 +72,7 @@ object JobParser {
                      attempts: Int,
                      previous: Action?) {
 
+
         if (actions.isEmpty())
             return
 
@@ -86,16 +87,16 @@ object JobParser {
         )
 
         //updating the list of frameworks setup
-        manager.frameworks.getOrPut(action.data.groupId){HashSet()}
+        manager.frameworks.getOrPut(action.data.groupId) { HashSet() }
                 .add(action.data.typeId)
 
 
-        if (manager.head == null) {
+        if (!manager.isInitialized) {
             manager.head = action
         }
 
-        if (previous != null) {
-            ArrayList(previous.data.nextActionIds).add(action.actionId)
+        previous?.let {
+            previous.data.nextActionIds.add(action.actionId)
         }
         manager.registerAction(action)
 
@@ -115,7 +116,7 @@ object JobParser {
             manager.registerAction(errorAction)
 
             //updating the list of frameworks setup
-            manager.frameworks.getOrPut(errorAction.data.groupId){HashSet()}
+            manager.frameworks.getOrPut(errorAction.data.groupId) { HashSet() }
                     .add(errorAction.data.typeId)
         }
 
@@ -125,20 +126,20 @@ object JobParser {
 
     @JvmStatic
     fun parseSequentialAction(action: JsonNode,
-    jobId: String,
-    actionsQueue: BlockingQueue<ActionData>,
-    client: CuratorFramework,
-    attempts: Int): SequentialAction {
+                              jobId: String,
+                              actionsQueue: BlockingQueue<ActionData>,
+                              client: CuratorFramework,
+                              attempts: Int): SequentialAction {
 
-        return  SequentialAction(action.path("name").asText(),
+        return SequentialAction(action.path("name").asText(),
                 action.path("file").asText(),
                 action.path("runner").path("group").asText(),
                 action.path("runner").path("type").asText(),
                 action.path("exports").fields().asSequence().map { it.key to it.value.asText() }.toMap(),
-        jobId,
-        actionsQueue,
-        client,
-        attempts)
+                jobId,
+                actionsQueue,
+                client,
+                attempts)
 
     }
 
index 16c10b4..b2fe41a 100644 (file)
@@ -41,10 +41,10 @@ data class JobManager(var name: String = "",
      */
     fun start(): Unit = head.execute()
 
-    val outOfActions: Boolean = registeredActions.filterValues {
-        action -> action.data.status == ActionStatus.pending ||
-            action.data.status == ActionStatus.queued ||
-            action.data.status == ActionStatus.started
+    val outOfActions: Boolean = registeredActions.filterValues { action ->
+        action.data.status == ActionStatus.pending ||
+                action.data.status == ActionStatus.queued ||
+                action.data.status == ActionStatus.started
     }.isEmpty()
 
     /**
@@ -53,9 +53,9 @@ data class JobManager(var name: String = "",
      *
      * @return the ActionData of the next action, returns null if no such action exists
      */
-    fun getNextActionData(): ActionData {
+    fun getNextActionData(): ActionData? {
 
-        val nextAction: ActionData = executionQueue.poll()
+        val nextAction: ActionData? = executionQueue.poll()
 
         if (nextAction != null) {
             registeredActions[nextAction.id]!!.announceStart()
@@ -78,7 +78,7 @@ data class JobManager(var name: String = "",
      * @param action
      */
     fun registerAction(action: Action) {
-        registeredActions.put(action.actionId, action)
+        registeredActions[action.actionId] = action
     }
 
     /**
@@ -87,14 +87,18 @@ data class JobManager(var name: String = "",
      * @param actionId
      */
     fun actionComplete(actionId: String) {
-
         val action = registeredActions[actionId]
-        action!!.announceComplete()
-        action.data.nextActionIds.forEach{id -> registeredActions[id]!!.execute()}
+        action?.let {
+
+            it.announceComplete()
+
+            action.data.nextActionIds.forEach { id -> registeredActions[id]!!.execute() }
+
+            // we don't need the error action anymore
+            if (it.data.hasErrorAction)
+                registeredActions[action.data.errorActionId]!!.announceCanceled()
+        }
 
-        // we don't need the error action anymore
-        if (action.data.errorActionId != null)
-            registeredActions[action.data.errorActionId]!!.announceCanceled()
     }
 
     /**
@@ -109,7 +113,7 @@ data class JobManager(var name: String = "",
 
         val action = registeredActions[actionId]
         val id = action!!.handleFailure(message)
-        if (id != null)
+        if (!id.isEmpty())
             registeredActions[id]?.execute()
 
         //delete all future actions
@@ -121,7 +125,7 @@ data class JobManager(var name: String = "",
         if (action.data.status != ActionStatus.failed)
             action.announceCanceled()
 
-        action.data.nextActionIds.forEach{id ->
+        action.data.nextActionIds.forEach { id ->
             val registeredAction = registeredActions[id]
             if (registeredAction != null) {
                 cancelFutureActions(registeredAction)
@@ -141,4 +145,6 @@ data class JobManager(var name: String = "",
 
     fun actionsCount(): Int = executionQueue.size
 
+    val isInitialized: Boolean
+        get() = ::head.isInitialized
 }
index 5810065..07360ab 100644 (file)
@@ -40,12 +40,20 @@ abstract class Action : KLogging() {
 
     fun announceQueued() {
         log.debug("Action ${data.name} of group ${data.groupId} and of type ${data.typeId} is queued for execution")
+        println("**********************")
+        println(actionPath)
+        println("queued")
+        println("**********************")
         client.setData().forPath(actionPath, ActionStatus.queued.value.toByteArray())
         data.status = ActionStatus.queued
     }
 
     fun announceComplete() {
         log.debug("Action ${data.name} of group ${data.groupId} and of type ${data.typeId} complete")
+        println("**********************")
+        println(actionPath)
+        println("complete")
+        println("**********************")
         client.setData().forPath(actionPath, ActionStatus.complete.value.toByteArray())
         data.status = ActionStatus.complete
     }
index bcf85d2..49dd6f7 100644 (file)
@@ -50,7 +50,7 @@ open class SequentialActionBase : Action() {
         println("Part ${data.name} of group ${data.groupId} and of type ${data.typeId} failed on attempt $attempt with message: $message")
         attempt += 1
 
-        lateinit var result: String
+        var result: String
         if (attempt <= attempts) {
             result = data.id
         }
index 84c633d..0138fe6 100644 (file)
@@ -121,9 +121,10 @@ compileTestKotlin {
     kotlinOptions.jvmTarget = "1.8"
 }
 
-tet {
-    dependsOn compileJava
+compileTestScala {
+    dependsOn compileScala
 }
+
 compileScala {
     dependsOn compileJava
     classpath += files(compileJava.destinationDir) + files(compileKotlin.destinationDir)
index bc9c679..7e739dc 100755 (executable)
@@ -26,7 +26,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory
 import org.apache.curator.retry.ExponentialBackoffRetry
 import org.apache.curator.test.TestingServer
 import org.apache.zookeeper.CreateMode
-import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.{DoNotDiscover, FlatSpec, Matchers}
 
 import scala.collection.JavaConverters._
 
index e119cbc..bcb667e 100755 (executable)
@@ -40,16 +40,19 @@ class JobExecutionTests extends FlatSpec with Matchers {
   val queue = new LinkedBlockingQueue[ActionData]()
 
   // this will be performed by the job bootstraper
+
   client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId")
   //  client.setData().forPath(s"/$jobId/src",src.getBytes)
   //  client.setData().forPath(s"/$jobId/branch", branch.getBytes)
 
+
   val job = JobParser.parse(jobId, yaml, queue, client, 1)
 
   "a job" should "queue the first action when the JobManager.start method is called " in {
 
     job.start
-    queue.peek.getName should be ("start")
+
+    queue.peek.getName should be("start")
 
     // making sure that the status is reflected in zk
     val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000")
@@ -59,8 +62,8 @@ class JobExecutionTests extends FlatSpec with Matchers {
 
   it should "return the start action when calling getNextAction and dequeue it" in {
 
-    job.getNextActionData.getName should be ("start")
-    queue.size should be (0)
+    job.getNextActionData.getName should be("start")
+    queue.size should be(0)
 
     // making sure that the status is reflected in zk
     val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000")
@@ -74,42 +77,43 @@ class JobExecutionTests extends FlatSpec with Matchers {
 
     // making sure that the status is reflected in zk
     val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000")
-    new String(actionStatus) should be("complete")
+
+    new String(new String(actionStatus)) should be("complete")
 
   }
 
-  "the next step2 job" should "be queued as a result of the completion" in {
+      "the next step2 job" should "be queued as a result of the completion" in {
 
-    queue.peek.getName should be ("step2")
+        queue.peek.getName should be("step2")
 
-    // making sure that the status is reflected in zk
-    val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001")
-    new String(actionStatus) should be("queued")
+        // making sure that the status is reflected in zk
+        val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001")
+        new String(actionStatus) should be("queued")
 
-  }
+      }
 
-  it should "be marked as started when JobManager.getNextActionData is called" in {
+      it should "be marked as started when JobManager.getNextActionData is called" in {
 
-    val data = job.getNextActionData
+        val data = job.getNextActionData
 
-    data.getName should be ("step2")
+        data.getName should be("step2")
 
-    // making sure that the status is reflected in zk
-    val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001")
-    new String(actionStatus) should be("started")
-  }
+        // making sure that the status is reflected in zk
+        val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001")
+        new String(actionStatus) should be("started")
+      }
 
-  it should "be marked as failed when JobManager. is called" in {
+        it should "be marked as failed when JobManager. is called" in {
 
-    job.actionFailed("0000000001", "test failure")
-    queue.peek.getName should be ("error-action")
+          job.actionFailed("0000000001", "test failure")
+          queue.peek.getName should be ("error-action")
 
-    // making sure that the status is reflected in zk
-    val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001-error")
-    new String(actionStatus) should be("queued")
+          // making sure that the status is reflected in zk
+          val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001-error")
+          new String(actionStatus) should be("queued")
 
-    // and returned by getNextActionData
-    val data = job.getNextActionData
+          // and returned by getNextActionData
+          val data = job.getNextActionData
 
-  }
+        }
 }
index 3af00e6..9f88322 100755 (executable)
@@ -24,7 +24,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory
 import org.apache.curator.retry.ExponentialBackoffRetry
 import org.apache.curator.test.TestingServer
 import org.apache.zookeeper.CreateMode
-import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.{DoNotDiscover, FlatSpec, Matchers}
 
 import scala.io.Source
 
index adee3d8..b343a6b 100755 (executable)
@@ -26,7 +26,7 @@ import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
 import org.apache.curator.retry.ExponentialBackoffRetry
 import org.apache.curator.test.TestingServer
 import org.apache.zookeeper.CreateMode
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+import org.scalatest.{BeforeAndAfterEach, DoNotDiscover, FlatSpec, Matchers}
 
 import scala.io.Source