AMATERASU-51: Add support for configuring the path for action level environment path
authorEran Bartenstein <eran.bartenstein@nab.com.au>
Sat, 10 Nov 2018 09:46:50 +0000 (20:46 +1100)
committerEran Bartenstein <eran.bartenstein@nab.com.au>
Sat, 10 Nov 2018 09:46:50 +0000 (20:46 +1100)
Move JobManager from Scala -> Kotlin

leader/build.gradle
leader/src/main/kotlin/org/apache/amaterasu/leader/execution/JobManager.kt [new file with mode: 0644]
leader/src/main/scala/org/apache/amaterasu/leader/execution/JobManager.scala [deleted file]

index dc244fc..ea1c3cb 100644 (file)
@@ -20,7 +20,6 @@ plugins {
     id 'scala'
     id 'org.jetbrains.kotlin.jvm'
     id 'java'
-
 }
 
 sourceCompatibility = 1.8
@@ -92,7 +91,7 @@ sourceSets {
             srcDirs = ['src/main/kotlin','src/main/java', 'src/main/scala']
         }
         java {
-            srcDirs = []
+            srcDirs = ['src/main/java']
         }
     }
 }
@@ -120,6 +119,16 @@ task copyToHome() {
 compileKotlin{
     kotlinOptions.jvmTarget = "1.8"
 }
+
 compileTestKotlin {
     kotlinOptions.jvmTarget = "1.8"
 }
+
+compileScala {
+    dependsOn compileJava
+    classpath += files(compileJava.destinationDir) + files(compileKotlin.destinationDir)
+}
+
+compileJava {
+    dependsOn compileKotlin
+}
diff --git a/leader/src/main/kotlin/org/apache/amaterasu/leader/execution/JobManager.kt b/leader/src/main/kotlin/org/apache/amaterasu/leader/execution/JobManager.kt
new file mode 100644 (file)
index 0000000..984b719
--- /dev/null
@@ -0,0 +1,130 @@
+package org.apache.amaterasu.leader.execution
+
+import org.apache.amaterasu.common.configuration.enums.ActionStatus
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.common.logging.KLogging
+import org.apache.amaterasu.leader.common.execution.actions.Action
+import org.apache.curator.framework.CuratorFramework
+import java.util.concurrent.BlockingQueue
+
+/**
+ * Created by Eran Bartenstein on 10/11/18.
+ */
+class JobManager : KLogging() {
+    var name: String = ""
+    var jobId: String = ""
+    lateinit var client: CuratorFramework
+    lateinit var head: Action
+
+    // TODO: this is not private due to tests, fix this!!!
+    val registeredActions = HashMap<String, Action>()
+    val frameworks = HashMap<String, HashSet<String>>()
+    private lateinit var executionQueue: BlockingQueue<ActionData>
+
+    /**
+     * The start method initiates the job execution by executing the first action.
+     * start mast be called once and by the JobManager only
+     */
+    fun start(): Unit = head.execute()
+
+    val outOfActions: Boolean = registeredActions.filterValues {
+        action -> action.data.status == ActionStatus.pending ||
+            action.data.status == ActionStatus.queued ||
+            action.data.status == ActionStatus.started
+    }.isEmpty()
+
+    /**
+     * getNextActionData returns the data of the next action to be executed if such action
+     * exists
+     *
+     * @return the ActionData of the next action, returns null if no such action exists
+     */
+    fun getNextActionData(): ActionData {
+
+        val nextAction: ActionData = executionQueue.poll()
+
+        if (nextAction != null) {
+            registeredActions[nextAction.id]!!.announceStart()
+        }
+
+        return nextAction
+    }
+
+    fun reQueueAction(actionId: String) {
+
+        val action = registeredActions[actionId]
+        executionQueue.put(action!!.data)
+        registeredActions[actionId]!!.announceQueued()
+
+    }
+
+    /**
+     * Registers an action with the job
+     *
+     * @param action
+     */
+    fun registerAction(action: Action) {
+        registeredActions.put(action.actionId, action)
+    }
+
+    /**
+     * announce the completion of an action and executes the next actions
+     *
+     * @param actionId
+     */
+    fun actionComplete(actionId: String) {
+
+        val action = registeredActions[actionId]
+        action!!.announceComplete()
+        action.data.nextActionIds.forEach{id -> registeredActions[id]!!.execute()}
+
+        // we don't need the error action anymore
+        if (action.data.errorActionId != null)
+            registeredActions[action.data.errorActionId]!!.announceCanceled()
+    }
+
+    /**
+     * gets the next action id which can be either the same action or an error action
+     * and if it exist (we have an error action or a retry)
+     *
+     * @param actionId
+     */
+    fun actionFailed(actionId: String, message: String) {
+
+        log.warn(message)
+
+        val action = registeredActions[actionId]
+        val id = action!!.handleFailure(message)
+        if (id != null)
+            registeredActions[id]?.execute()
+
+        //delete all future actions
+        cancelFutureActions(action)
+    }
+
+    fun cancelFutureActions(action: Action) {
+
+        if (action.data.status != ActionStatus.failed)
+            action.announceCanceled()
+
+        action.data.nextActionIds.forEach{id ->
+            val registeredAction = registeredActions[id]
+            if (registeredAction != null) {
+                cancelFutureActions(registeredAction)
+            }
+        }
+    }
+
+    /**
+     * announce the start of execution of the action
+     */
+    fun actionStarted(actionId: String) {
+
+        val action = registeredActions[actionId]
+        action?.announceStart()
+
+    }
+
+    fun actionsCount(): Int = executionQueue.size
+
+}
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobManager.scala b/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobManager.scala
deleted file mode 100755 (executable)
index 70642db..0000000
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.execution
-
-import java.util.concurrent.BlockingQueue
-
-import org.apache.amaterasu.common.configuration.enums.ActionStatus
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.leader.common.execution.actions.Action
-import org.apache.curator.framework.CuratorFramework
-
-import scala.collection.concurrent.TrieMap
-import scala.collection.mutable
-
-/**
-  * The JobManager manages the lifecycle of a job. It queues new actions for execution,
-  * tracks the state of actions and is in charge of communication with the underlying
-  * cluster management framework (mesos)
-  */
-class JobManager extends Logging {
-
-  var name: String = _
-  var jobId: String = _
-  var client: CuratorFramework = _
-  var head: Action = _
-
-  // TODO: this is not private due to tests, fix this!!!
-  val registeredActions = new TrieMap[String, Action]
-  val frameworks = new TrieMap[String, mutable.HashSet[String]]
-  private var executionQueue: BlockingQueue[ActionData] = _
-
-  /**
-    * The start method initiates the job execution by executing the first action.
-    * start mast be called once and by the JobManager only
-    */
-  def start(): Unit = {
-
-    head.execute()
-
-  }
-
-  def outOfActions: Boolean = !registeredActions.values.exists(a => a.data.getStatus == ActionStatus.pending ||
-    a.data.getStatus == ActionStatus.queued ||
-    a.data.getStatus == ActionStatus.started)
-  /**
-    * getNextActionData returns the data of the next action to be executed if such action
-    * exists
-    *
-    * @return the ActionData of the next action, returns null if no such action exists
-    */
-  def getNextActionData: ActionData = {
-
-    val nextAction: ActionData = executionQueue.poll()
-
-    if (nextAction != null) {
-      registeredActions(nextAction.getId).announceStart
-    }
-
-    nextAction
-  }
-
-  def reQueueAction(actionId: String): Unit = {
-
-    val action = registeredActions(actionId)
-    executionQueue.put(action.data)
-    registeredActions(actionId).announceQueued
-
-  }
-
-  /**
-    * Registers an action with the job
-    *
-    * @param action
-    */
-  def registerAction(action: Action): Unit = {
-
-    registeredActions.put(action.actionId, action)
-
-  }
-
-  /**
-    * announce the completion of an action and executes the next actions
-    *
-    * @param actionId
-    */
-  def actionComplete(actionId: String): Unit = {
-
-    val action = registeredActions.get(actionId).get
-    action.announceComplete
-    action.data.getNextActionIds.toArray.foreach(id => registeredActions.get(id.toString).get.execute())
-
-    // we don't need the error action anymore
-    if (action.data.errorActionId != null)
-      registeredActions.get(action.data.errorActionId).get.announceCanceled
-  }
-
-  /**
-    * gets the next action id which can be either the same action or an error action
-    * and if it exist (we have an error action or a retry)
-    *
-    * @param actionId
-    */
-  def actionFailed(actionId: String, message: String): Unit = {
-
-    log.warn(message)
-
-    val action = registeredActions.get(actionId).get
-    val id = action.handleFailure(message)
-    if (id != null)
-      registeredActions.get(id).get.execute()
-
-    //delete all future actions
-    cancelFutureActions(action)
-  }
-
-  def cancelFutureActions(action: Action): Unit = {
-
-    if (action.data.getStatus != ActionStatus.failed)
-      action.announceCanceled
-
-    action.data.getNextActionIds.toArray.foreach(id =>
-      cancelFutureActions(registeredActions.get(id.toString).get))
-  }
-
-  /**
-    * announce the start of execution of the action
-    */
-  def actionStarted(actionId: String): Unit = {
-
-    val action = registeredActions.get(actionId).get
-    action.announceStart
-
-  }
-
-  def actionsCount(): Int = {
-    executionQueue.size()
-  }
-}
-
-object JobManager {
-
-  /**
-    * The apply method starts the job execution once the job is created from the maki.yaml file
-    * it is in charge of creating the internal flow map, setting up ZooKeeper and executing
-    * the first action
-    * If the job execution is resumed (a job that was stooped) the init method will restore the
-    * state of the job from ZooKepper
-    *
-    * @param jobId
-    * @param name
-    * @param jobsQueue
-    * @param client
-    * @return
-    */
-  def apply(
-    jobId: String,
-    name: String,
-    jobsQueue: BlockingQueue[ActionData],
-    client: CuratorFramework
-  ): JobManager = {
-
-    val manager = new JobManager()
-    manager.name = name
-    manager.executionQueue = jobsQueue
-    manager.jobId = jobId
-    manager.client = client
-
-    manager
-
-  }
-
-}