crated kotlin SequentialAction and ErrorAction implementations
authorYaniv Rodenski <roadan@gmail.com>
Tue, 13 Nov 2018 00:35:02 +0000 (11:35 +1100)
committerYaniv Rodenski <roadan@gmail.com>
Tue, 13 Nov 2018 00:35:02 +0000 (11:35 +1100)
leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/ErrorAction.kt [new file with mode: 0644]
leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/SequentialAction.kt [new file with mode: 0644]
leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/SequentialActionBase.kt [new file with mode: 0644]

diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/ErrorAction.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/ErrorAction.kt
new file mode 100644 (file)
index 0000000..2029cbc
--- /dev/null
@@ -0,0 +1,31 @@
+package org.apache.amaterasu.leader.common.execution.actions
+
+import org.apache.amaterasu.common.configuration.enums.ActionStatus
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.curator.framework.CuratorFramework
+import org.apache.zookeeper.CreateMode
+import java.util.concurrent.BlockingQueue
+
+class ErrorAction(name: String,
+                  src: String,
+                  parent: String,
+                  groupId: String,
+                  typeId: String,
+                  jobId: String,
+                  queue: BlockingQueue<ActionData>,
+                  zkClient: CuratorFramework) : SequentialActionBase() {
+
+    init {
+        jobsQueue = queue
+
+        // creating a znode for the action
+        client = zkClient
+        actionPath = client.create().withMode(CreateMode.PERSISTENT).forPath("/$jobId/task-$parent-error", ActionStatus.pending.toString().toByteArray())
+        actionId = actionPath.substring(actionPath.indexOf('-') + 1).replace("/", "-")
+
+        this.jobId = jobId
+        data =  ActionData (ActionStatus.pending, name, src, groupId, typeId, actionId, hashMapOf(), arrayListOf())
+        jobsQueue = queue
+        client = zkClient
+    }
+}
\ No newline at end of file
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/SequentialAction.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/SequentialAction.kt
new file mode 100644 (file)
index 0000000..6362e89
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.execution.actions
+
+import org.apache.amaterasu.common.configuration.enums.ActionStatus
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.curator.framework.CuratorFramework
+import org.apache.zookeeper.CreateMode
+import java.util.concurrent.BlockingQueue
+
+class SequentialAction(name: String,
+                       src: String,
+                       groupId: String,
+                       typeId: String,
+                       exports: Map<String, String>,
+                       jobId: String,
+                       queue: BlockingQueue<ActionData>,
+                       zkClient: CuratorFramework,
+                       attempts: Int): SequentialActionBase() {
+    init {
+        this.jobsQueue = queue
+
+        // creating a znode for the action
+        client = zkClient
+        actionPath = client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/$jobId/task-", ActionStatus.pending.toString().toByteArray())
+        actionId = actionPath.substring(actionPath.indexOf("task-") + 5)
+
+        this.attempts = attempts
+        this.jobId = jobId
+        val javaExports = exports
+        data = ActionData(ActionStatus.pending, name, src, groupId, typeId, actionId, javaExports, arrayListOf())
+        jobsQueue = queue
+        client = zkClient
+
+    }
+}
\ No newline at end of file
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/SequentialActionBase.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/SequentialActionBase.kt
new file mode 100644 (file)
index 0000000..bcf85d2
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.execution.actions
+
+import org.apache.amaterasu.common.configuration.enums.ActionStatus
+import org.apache.amaterasu.common.dataobjects.ActionData
+import java.util.concurrent.BlockingQueue
+
+open class SequentialActionBase : Action() {
+
+
+    var jobId: String = ""
+    lateinit var jobsQueue: BlockingQueue<ActionData>
+    var attempts: Int = 2
+    private var attempt: Int = 1
+
+    override fun execute() {
+
+        try {
+
+            announceQueued()
+            jobsQueue.add(data)
+
+        }
+        catch(e: Exception) {
+
+            //TODO: this will not invoke the error action
+            e.message?.let{ handleFailure(it) }
+
+        }
+
+    }
+
+    override fun handleFailure(message: String): String {
+
+        println("Part ${data.name} of group ${data.groupId} and of type ${data.typeId} failed on attempt $attempt with message: $message")
+        attempt += 1
+
+        lateinit var result: String
+        if (attempt <= attempts) {
+            result = data.id
+        }
+        else {
+            announceFailure()
+            println("===> moving to err action ${data.errorActionId}")
+            data.status = ActionStatus.failed
+            result = data.errorActionId
+        }
+        return result
+    }
+
+}