AMATERASU-51: Add support for configuring the path for action level environment path
authorEran Bartenstein <eran.bartenstein@nab.com.au>
Tue, 30 Oct 2018 22:29:46 +0000 (09:29 +1100)
committerEran Bartenstein <eran.bartenstein@nab.com.au>
Tue, 30 Oct 2018 22:29:46 +0000 (09:29 +1100)
Complete conversion of Action.scala to Kotlin

common/src/main/kotlin/org/apache/amaterasu/common/configuration/enums/ActionStatus.kt
leader/src/main/kotlin/org/apache/amaterasu/leader/execution/Action.kt [deleted file]
leader/src/main/kotlin/org/apache/amaterasu/leader/execution/actions/Action.kt [new file with mode: 0644]
leader/src/main/scala/org/apache/amaterasu/leader/execution/JobManager.scala
leader/src/main/scala/org/apache/amaterasu/leader/execution/actions/Action.scala [deleted file]
leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala

index df12350..24853c5 100644 (file)
@@ -7,7 +7,7 @@ enum class ActionStatus (val value: String) {
     pending("pending"),
     queued("queued"),
     started("started"),
-    completed("started"),
+    complete("complete"),
     failed("failed"),
     canceled("canceled")
-}
\ No newline at end of file
+}
diff --git a/leader/src/main/kotlin/org/apache/amaterasu/leader/execution/Action.kt b/leader/src/main/kotlin/org/apache/amaterasu/leader/execution/Action.kt
deleted file mode 100644 (file)
index f22b58f..0000000
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.amaterasu.leader.execution
-
-
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.common.configuration.enums.ActionStatus
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.curator.framework.CuratorFramework
-
-/**
- * Created by Eran Bartenstein on 19/10/18.
- */
-abstract class Action() : Logging {
-    lateinit var actionPath: String
-    lateinit var actionId: String
-    lateinit var client: CuratorFramework
-    lateinit var data: ActionData
-    abstract fun execute()
-    abstract fun handleFailure(message: String) : String
-
-    fun announceStart() {
-        log().debug("Starting action ${data.name} of group ${data.groupId} and type ${data.typeId}")
-        val startedAction: String = ActionStatus.started.value
-        client.setData().forPath(actionPath, startedAction.toByteArray())
-        data.status = ActionStatus.started
-    }
-}
\ No newline at end of file
diff --git a/leader/src/main/kotlin/org/apache/amaterasu/leader/execution/actions/Action.kt b/leader/src/main/kotlin/org/apache/amaterasu/leader/execution/actions/Action.kt
new file mode 100644 (file)
index 0000000..68b78db
--- /dev/null
@@ -0,0 +1,46 @@
+package org.apache.amaterasu.leader.execution.actions
+
+
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.common.configuration.enums.ActionStatus
+import org.apache.amaterasu.common.logging.Logging
+import org.apache.curator.framework.CuratorFramework
+
+/**
+ * Created by Eran Bartenstein on 19/10/18.
+ */
+abstract class Action() : Logging {
+    lateinit var actionPath: String
+    lateinit var actionId: String
+    lateinit var client: CuratorFramework
+    lateinit var data: ActionData
+    abstract fun execute()
+    abstract fun handleFailure(message: String) : String
+
+    fun announceStart() {
+        log().debug("Starting action ${data.name} of group ${data.groupId} and type ${data.typeId}")
+        client.setData().forPath(actionPath, ActionStatus.started.value.toByteArray())
+        data.status = ActionStatus.started
+    }
+
+    fun announceQueued() {
+        log().debug("Action ${data.name} of group ${data.groupId} and of type ${data.typeId} is queued for execution")
+        client.setData().forPath(actionPath, ActionStatus.queued.value.toByteArray())
+        data.status = ActionStatus.queued
+    }
+
+    fun announceComplete() {
+        log().debug("Action ${data.name} of group ${data.groupId} and of type ${data.typeId} complete")
+        client.setData().forPath(actionPath, ActionStatus.complete.value.toByteArray())
+        data.status = ActionStatus.complete
+    }
+
+    fun announceCanceled() {
+        log().debug("Action ${data.name} of group ${data.groupId} and of type ${data.typeId} was canceled")
+        client.setData().forPath(actionPath, ActionStatus.canceled.value.toByteArray())
+        data.status = ActionStatus.canceled
+    }
+
+    protected fun announceFailure() {}
+
+}
index abb7264..b9cfded 100755 (executable)
@@ -21,6 +21,7 @@ import java.util.concurrent.BlockingQueue
 import org.apache.amaterasu.common.configuration.enums.ActionStatus
 import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.common.logging.Logging
+import org.apache.amaterasu.leader.execution.actions.Action
 import org.apache.curator.framework.CuratorFramework
 
 import scala.collection.concurrent.TrieMap
@@ -53,9 +54,9 @@ class JobManager extends Logging {
 
   }
 
-  def outOfActions: Boolean = !registeredActions.values.exists(a => a.data.status == ActionStatus.pending ||
-    a.data.status == ActionStatus.queued ||
-    a.data.status == ActionStatus.started)
+  def outOfActions: Boolean = !registeredActions.values.exists(a => a.data.getStatus == ActionStatus.pending ||
+    a.data.getStatus == ActionStatus.queued ||
+    a.data.getStatus == ActionStatus.started)
   /**
     * getNextActionData returns the data of the next action to be executed if such action
     * exists
@@ -67,7 +68,7 @@ class JobManager extends Logging {
     val nextAction: ActionData = executionQueue.poll()
 
     if (nextAction != null) {
-      registeredActions(nextAction.id).announceStart
+      registeredActions(nextAction.getId).announceStart
     }
 
     nextAction
@@ -101,7 +102,7 @@ class JobManager extends Logging {
 
     val action = registeredActions.get(actionId).get
     action.announceComplete
-    action.data.nextActionIds.foreach(id =>
+    action.data.getNextActionIds.forEach(id =>
       registeredActions.get(id).get.execute())
 
     // we don't need the error action anymore
@@ -130,10 +131,10 @@ class JobManager extends Logging {
 
   def cancelFutureActions(action: Action): Unit = {
 
-    if (action.data.status != ActionStatus.failed)
+    if (action.data.getStatus != ActionStatus.failed)
       action.announceCanceled
 
-    action.data.nextActionIds.foreach(id =>
+    action.data.getNextActionIds.forEach(id =>
       cancelFutureActions(registeredActions.get(id).get))
   }
 
@@ -184,4 +185,4 @@ object JobManager {
 
   }
 
-}
\ No newline at end of file
+}
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/execution/actions/Action.scala b/leader/src/main/scala/org/apache/amaterasu/leader/execution/actions/Action.scala
deleted file mode 100755 (executable)
index f540997..0000000
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.execution.actions
-
-import org.apache.amaterasu.common.configuration.enums.ActionStatus
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.curator.framework.CuratorFramework
-
-trait Action extends Logging {
-
-  // this is the znode path for the action
-  var actionPath: String = _
-  var actionId: String = _
-
-  var data: ActionData = _
-  var client: CuratorFramework = _
-
-  def execute(): Unit
-
-  def handleFailure(message: String): String
-
-  /**
-    * The announceStart register the beginning of the of the task with ZooKeper
-    */
-  def announceStart: Unit = {
-
-    log.debug(s"Starting action ${data.name} of group ${data.groupId} and type ${data.typeId}")
-    client.setData().forPath(actionPath, ActionStatus.started.toString.getBytes)
-    data.status = ActionStatus.started
-  }
-
-  def announceQueued: Unit = {
-
-    log.debug(s"Action ${data.name} of group ${data.groupId} and of type ${data.typeId} is queued for execution")
-    client.setData().forPath(actionPath, ActionStatus.queued.toString.getBytes)
-    data.status = ActionStatus.queued
-  }
-
-  def announceComplete: Unit = {
-
-    log.debug(s"Action ${data.name} of group ${data.groupId} and of type ${data.typeId} completed")
-    client.setData().forPath(actionPath, ActionStatus.complete.toString.getBytes)
-    data.status = ActionStatus.complete
-  }
-
-  def announceCanceled: Unit = {
-
-    log.debug(s"Action ${data.name} of group ${data.groupId} and of type ${data.typeId} was canceled")
-    client.setData().forPath(actionPath, ActionStatus.canceled.toString.getBytes)
-    data.status = ActionStatus.canceled
-  }
-  protected def announceFailure(): Unit = {}
-
-}
\ No newline at end of file
index 1bbaa15..8c11cc2 100644 (file)
@@ -375,11 +375,11 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
 
           //completedContainersAndTaskIds.put(containerId, task.id)
           jobManager.actionComplete(task.id)
-          log.info(s"Container $containerId completed with task ${task.id} with success.")
+          log.info(s"Container $containerId complete with task ${task.id} with success.")
         } else {
           // TODO: Check the getDiagnostics value and see if appropriate
           jobManager.actionFailed(task.id, status.getDiagnostics)
-          log.warn(s"Container $containerId completed with task ${task.id} with failed status code (${status.getExitStatus})")
+          log.warn(s"Container $containerId complete with task ${task.id} with failed status code (${status.getExitStatus})")
         }
       }
     }
index 24f28cc..8bc9dd7 100644 (file)
@@ -67,9 +67,9 @@ class YarnRMCallbackHandler(nmClient: NMClientAsync,
         val taskId = containersIdsToTaskIds(containerId)
         if (status.getExitStatus == 0) {
           completedContainersAndTaskIds.put(containerId, taskId)
-          log.info(s"Container $containerId completed with task $taskId with success.")
+          log.info(s"Container $containerId complete with task $taskId with success.")
         } else {
-          log.warn(s"Container $containerId completed with task $taskId with failed status code (${status.getExitStatus}.")
+          log.warn(s"Container $containerId complete with task $taskId with failed status code (${status.getExitStatus}.")
           val failedTries = failedTasksCounter.getOrElse(taskId, 0)
           if (failedTries < MAX_ATTEMPTS_PER_TASK) {
             // TODO: notify and ask for a new container