Action Kotlin refactoring fixed
authorYaniv Rodenski <roadan@gmail.com>
Sun, 4 Nov 2018 02:13:26 +0000 (13:13 +1100)
committerYaniv Rodenski <roadan@gmail.com>
Sun, 4 Nov 2018 02:13:26 +0000 (13:13 +1100)
frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/Action.kt [moved from leader/src/main/kotlin/org/apache/amaterasu/leader/execution/actions/Action.kt with 64% similarity]
leader/build.gradle
leader/src/main/scala/org/apache/amaterasu/leader/common/actions/SequentialAction.scala [moved from leader/src/main/scala/org/apache/amaterasu/leader/execution/actions/SequentialAction.scala with 97% similarity]
leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala
leader/src/main/scala/org/apache/amaterasu/leader/execution/JobManager.scala
leader/src/test/scala/org/apache/amaterasu/common/execution/ActionStatusTests.scala

index 9d5405e..4653215 100644 (file)
@@ -17,7 +17,7 @@ class PySparkRunnerProvider extends RunnerSetupProvider {
     case "mesos" =>
       s"env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${conf.Webserver.Port}/dist/spark-${conf.Webserver.sparkVersion}.tgz " +
       s"java -cp executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.Webserver.sparkVersion}/jars/* " +
-      s"-Dscala.usejavacp=true -Djava.library.path=$libPath org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId ${conf.master} ${actionData.name}.stripMargin"
+      s"-Dscala.usejavacp=true -Djava.library.path=$libPath org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId ${conf.master} ${actionData.getName}.stripMargin"
     case "yarn" => "/bin/bash ./miniconda.sh -b -p $PWD/miniconda && " +
       s"/bin/bash spark/bin/load-spark-env.sh && " +
       s"java -cp spark/jars/*:executor.jar:spark-runner.jar:spark-runtime.jar:spark/conf/:${conf.YARN.hadoopHomeDir}/conf/ " +
@@ -25,7 +25,7 @@ class PySparkRunnerProvider extends RunnerSetupProvider {
       "-Dscala.usejavacp=true " +
       "-Dhdp.version=2.6.1.0-129 " +
       "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " +
-      s"'$jobId' '${conf.master}' '${actionData.name}' '${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}' '${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}' '$executorId' '$callbackAddress' " +
+      s"'$jobId' '${conf.master}' '${actionData.getName}' '${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}' '${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}' '$executorId' '$callbackAddress' " +
       s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " +
       s"2> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr "
     case _ => ""
index c92a784..e108d1d 100644 (file)
@@ -34,14 +34,14 @@ class SparkScalaRunnerProvider extends RunnerSetupProvider {
       s"env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${conf.Webserver.Port}/dist/spark-${conf.Webserver.sparkVersion}.tgz " +
       s"java -cp executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.Webserver.sparkVersion}/jars/* " +
       s"-Dscala.usejavacp=true -Djava.library.path=$libPath " +
-      s"org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId ${conf.master} ${actionData.name}".stripMargin
+      s"org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId ${conf.master} ${actionData.getName}".stripMargin
     case "yarn" => s"/bin/bash spark/bin/load-spark-env.sh && " +
       s"java -cp spark/jars/*:executor.jar:spark-runner.jar:spark-runtime.jar:spark/conf/:${conf.YARN.hadoopHomeDir}/conf/ " +
       "-Xmx2G " +
       "-Dscala.usejavacp=true " +
       "-Dhdp.version=2.6.1.0-129 " +
       "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " +
-      s"'$jobId' '${conf.master}' '${actionData.name}' '${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}' '${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}' '$executorId' '$callbackAddress' " +
+      s"'$jobId' '${conf.master}' '${actionData.getName}' '${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}' '${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}' '$executorId' '$callbackAddress' " +
       s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " +
       s"2> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr "
     case _ => ""
@@ -1,5 +1,20 @@
-package org.apache.amaterasu.leader.execution.actions
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.execution.actions
 
 import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.common.configuration.enums.ActionStatus
@@ -9,7 +24,7 @@ import org.apache.curator.framework.CuratorFramework
 /**
  * Created by Eran Bartenstein on 19/10/18.
  */
-abstract class Action() : Logging {
+abstract class Action : Logging {
     lateinit var actionPath: String
     lateinit var actionId: String
     lateinit var client: CuratorFramework
index 25f3cbc..dc244fc 100644 (file)
@@ -17,8 +17,8 @@
 plugins {
     id "com.github.johnrengelman.shadow" version "1.2.4"
     id 'com.github.maiflai.scalatest' version '0.22'
-    id 'org.jetbrains.kotlin.jvm'
     id 'scala'
+    id 'org.jetbrains.kotlin.jvm'
     id 'java'
 
 }
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.amaterasu.leader.execution.actions
+package org.apache.amaterasu.leader.common.actions
 
 import java.util
 import java.util.concurrent.BlockingQueue
 
 import org.apache.amaterasu.common.configuration.enums.ActionStatus
 import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.leader.common.execution.actions.Action
 import org.apache.curator.framework.CuratorFramework
 import org.apache.zookeeper.CreateMode
 
index 6dc63ce..e64e3d9 100755 (executable)
@@ -23,8 +23,8 @@ import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
 import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.leader.execution.JobManager
-import org.apache.amaterasu.leader.execution.actions.{ErrorAction, SequentialAction}
-import org.apache.amaterasu.leader.execution.actions.Action
+import org.apache.amaterasu.leader.common.actions.{ErrorAction, SequentialAction}
+import org.apache.amaterasu.leader.common.execution.actions.Action
 import org.apache.curator.framework.CuratorFramework
 
 import scala.collection.JavaConverters._
index b9cfded..70642db 100755 (executable)
@@ -21,7 +21,7 @@ import java.util.concurrent.BlockingQueue
 import org.apache.amaterasu.common.configuration.enums.ActionStatus
 import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.leader.execution.actions.Action
+import org.apache.amaterasu.leader.common.execution.actions.Action
 import org.apache.curator.framework.CuratorFramework
 
 import scala.collection.concurrent.TrieMap
@@ -102,8 +102,7 @@ class JobManager extends Logging {
 
     val action = registeredActions.get(actionId).get
     action.announceComplete
-    action.data.getNextActionIds.forEach(id =>
-      registeredActions.get(id).get.execute())
+    action.data.getNextActionIds.toArray.foreach(id => registeredActions.get(id.toString).get.execute())
 
     // we don't need the error action anymore
     if (action.data.errorActionId != null)
@@ -134,8 +133,8 @@ class JobManager extends Logging {
     if (action.data.getStatus != ActionStatus.failed)
       action.announceCanceled
 
-    action.data.getNextActionIds.forEach(id =>
-      cancelFutureActions(registeredActions.get(id).get))
+    action.data.getNextActionIds.toArray.foreach(id =>
+      cancelFutureActions(registeredActions.get(id.toString).get))
   }
 
   /**
index 75811fb..af2c8ba 100755 (executable)
@@ -21,7 +21,7 @@ import java.util.concurrent.LinkedBlockingQueue
 
 import org.apache.amaterasu.common.configuration.enums.ActionStatus
 import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.leader.execution.actions.SequentialAction
+import org.apache.amaterasu.leader.common.actions.SequentialAction
 import org.apache.curator.framework.CuratorFrameworkFactory
 import org.apache.curator.retry.ExponentialBackoffRetry
 import org.apache.curator.test.TestingServer