AMATERASU-51: Add support for configuring the path for action level environment path
authorEran Bartenstein <eran.bartenstein@nab.com.au>
Thu, 25 Oct 2018 06:06:31 +0000 (17:06 +1100)
committerEran Bartenstein <eran.bartenstein@nab.com.au>
Thu, 25 Oct 2018 06:06:31 +0000 (17:06 +1100)
Convert Action, ActionData & ActionStatus to Kotlin

common/build.gradle
common/src/main/kotlin/org/apache/amaterasu/common/configuration/enums/ActionStatus.kt [new file with mode: 0644]
common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/ActionData.kt [moved from common/src/main/scala/org/apache/amaterasu/common/dataobjects/ActionData.scala with 69% similarity, mode: 0644]
common/src/main/scala/org/apache/amaterasu/common/configuration/enums/ActionStatus.scala [deleted file]
gradle/wrapper/gradle-wrapper.properties
leader/build.gradle
leader/src/main/kotlin/org/apache/amaterasu/leader/execution/Action.kt [new file with mode: 0644]
leader/src/main/scala/org/apache/amaterasu/leader/execution/JobManager.scala
leader/src/test/scala/org/apache/amaterasu/common/execution/ActionStatusTests.scala [moved from leader/src/test/scala/org/apache/amaterasu/common/execution/ActionTests.scala with 84% similarity]

index 5a0a211..bc410a8 100644 (file)
@@ -16,6 +16,7 @@
  */
 plugins {
     id 'com.github.johnrengelman.shadow' version '1.2.4'
+    id 'kotlin'
     id 'scala'
 }
 
@@ -65,4 +66,27 @@ dependencies {
 task copyToHome() {
 }
 
+sourceSets {
+    test {
+        resources.srcDirs += [file('src/test/resources')]
+    }
+
+    // this is done so Scala will compile before Kotlin
+    main {
+        scala {
+            srcDirs = ['src/main/kotlin', 'src/main/java', 'src/main/scala']
+        }
+        java {
+            srcDirs = []
+        }
+    }
+}
+
+compileKotlin {
+    kotlinOptions.jvmTarget = "1.8"
+}
+compileTestKotlin {
+    kotlinOptions.jvmTarget = "1.8"
+}
+
 
diff --git a/common/src/main/kotlin/org/apache/amaterasu/common/configuration/enums/ActionStatus.kt b/common/src/main/kotlin/org/apache/amaterasu/common/configuration/enums/ActionStatus.kt
new file mode 100644 (file)
index 0000000..df12350
--- /dev/null
@@ -0,0 +1,13 @@
+package org.apache.amaterasu.common.configuration.enums
+
+/**
+ * Created by Eran Bartenstein on 21/10/18.
+ */
+enum class ActionStatus (val value: String) {
+    pending("pending"),
+    queued("queued"),
+    started("started"),
+    completed("started"),
+    failed("failed"),
+    canceled("canceled")
+}
\ No newline at end of file
  */
 package org.apache.amaterasu.common.dataobjects
 
-import org.apache.amaterasu.common.configuration.enums.ActionStatus.ActionStatus
-
+import org.apache.amaterasu.common.configuration.enums.ActionStatus
 import scala.collection.mutable.ListBuffer
 
-case class ActionData(var status: ActionStatus,
-                      name: String,
-                      src: String,
-                      groupId: String,
-                      typeId: String,
-                      id: String,
-                      exports: Map[String, String],
-                      nextActionIds: ListBuffer[String]) {
-  var errorActionId: String = _
+data class ActionData(var status: ActionStatus,
+                      var name: String,
+                      var src: String,
+                      var groupId: String,
+                      var typeId: String,
+                      var id: String,
+                      var exports: Map<String, String>,
+                      var nextActionIds: ListBuffer<String>) {
+    lateinit var errorActionId: String
 }
diff --git a/common/src/main/scala/org/apache/amaterasu/common/configuration/enums/ActionStatus.scala b/common/src/main/scala/org/apache/amaterasu/common/configuration/enums/ActionStatus.scala
deleted file mode 100755 (executable)
index 4d2afa3..0000000
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.common.configuration.enums
-
-object ActionStatus extends Enumeration {
-  type ActionStatus = Value
-  val pending = Value("pending")
-  val queued = Value("queued")
-  val started = Value("started")
-  val complete = Value("complete")
-  val failed = Value("failed")
-  val canceled = Value("canceled")
-}
\ No newline at end of file
index a95009c..7dc503f 100644 (file)
@@ -1,5 +1,5 @@
 distributionBase=GRADLE_USER_HOME
 distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-4.9-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-4.9-all.zip
 zipStoreBase=GRADLE_USER_HOME
 zipStorePath=wrapper/dists
index 147f1cb..0450748 100644 (file)
@@ -17,6 +17,7 @@
 plugins {
     id "com.github.johnrengelman.shadow" version "1.2.4"
     id 'com.github.maiflai.scalatest' version '0.22'
+    id 'kotlin'
     id 'scala'
     id 'java'
 
@@ -83,7 +84,7 @@ sourceSets {
     // this is done so Scala will compile before Java
     main {
         scala {
-            srcDirs = ['src/main/scala', 'src/main/java', 'src/main/kotlin']
+            srcDirs = ['src/main/kotlin', 'src/main/scala', 'src/main/java']
         }
         java {
             srcDirs = []
diff --git a/leader/src/main/kotlin/org/apache/amaterasu/leader/execution/Action.kt b/leader/src/main/kotlin/org/apache/amaterasu/leader/execution/Action.kt
new file mode 100644 (file)
index 0000000..f22b58f
--- /dev/null
@@ -0,0 +1,26 @@
+package org.apache.amaterasu.leader.execution
+
+
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.common.configuration.enums.ActionStatus
+import org.apache.amaterasu.common.logging.Logging
+import org.apache.curator.framework.CuratorFramework
+
+/**
+ * Created by Eran Bartenstein on 19/10/18.
+ */
+abstract class Action() : Logging {
+    lateinit var actionPath: String
+    lateinit var actionId: String
+    lateinit var client: CuratorFramework
+    lateinit var data: ActionData
+    abstract fun execute()
+    abstract fun handleFailure(message: String) : String
+
+    fun announceStart() {
+        log().debug("Starting action ${data.name} of group ${data.groupId} and type ${data.typeId}")
+        val startedAction: String = ActionStatus.started.value
+        client.setData().forPath(actionPath, startedAction.toByteArray())
+        data.status = ActionStatus.started
+    }
+}
\ No newline at end of file
index 38f4b7c..abb7264 100755 (executable)
@@ -21,7 +21,6 @@ import java.util.concurrent.BlockingQueue
 import org.apache.amaterasu.common.configuration.enums.ActionStatus
 import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.leader.execution.actions.Action
 import org.apache.curator.framework.CuratorFramework
 
 import scala.collection.concurrent.TrieMap
@@ -16,6 +16,7 @@
  */
 package org.apache.amaterasu.common.execution
 
+import java.util
 import java.util.concurrent.LinkedBlockingQueue
 
 import org.apache.amaterasu.common.configuration.enums.ActionStatus
@@ -27,13 +28,13 @@ import org.apache.curator.test.TestingServer
 import org.apache.zookeeper.CreateMode
 import org.scalatest.{FlatSpec, Matchers}
 
-class ActionTests extends FlatSpec with Matchers {
+class ActionStatusTests extends FlatSpec with Matchers {
 
   // setting up a testing zookeeper server (curator TestServer)
   val retryPolicy = new ExponentialBackoffRetry(1000, 3)
   val server = new TestingServer(2181, true)
   val jobId = s"job_${System.currentTimeMillis}"
-  val data = ActionData(ActionStatus.pending, "test_action", "start.scala", "spark","scala", null, Map.empty , null)
+  val data = new ActionData(ActionStatus.pending, "test_action", "start.scala", "spark","scala", null, new util.HashMap() , null)
 
   "an Action" should "queue it's ActionData int the job queue when executed" in {
 
@@ -44,11 +45,11 @@ class ActionTests extends FlatSpec with Matchers {
     client.start()
 
     client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId")
-    val action = SequentialAction(data.name, data.src, data.groupId, data.typeId, Map.empty, jobId, queue, client, 1)
+    val action = SequentialAction(data.getName, data.getSrc, data.getGroupId, data.getTypeId, Map.empty, jobId, queue, client, 1)
 
     action.execute()
-    queue.peek().name should be(data.name)
-    queue.peek().src should be(data.src)
+    queue.peek().getName() should be(data.getName)
+    queue.peek().getSrc should be(data.getSrc)
 
   }