AMATERASU-51: Add support for configuring the path for action level environment path
authorEran Bartenstein <ebarten@gmail.com>
Thu, 8 Nov 2018 01:01:38 +0000 (12:01 +1100)
committerEran Bartenstein <ebarten@gmail.com>
Thu, 8 Nov 2018 01:01:38 +0000 (12:01 +1100)
Move logging into Kotlin and create a bridge between Scala and Kotlin via Java

20 files changed:
build.gradle
common/build.gradle
common/src/main/java/org/apache/amaterasu/common/logging/Logging.java
common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala
executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ActiveNotifier.scala
executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosNotifier.scala
executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/YarnNotifier.scala
frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/SparkRunnersProvider.scala
frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunner.scala
leader-common/build.gradle
leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/Action.kt
leader/src/main/scala/org/apache/amaterasu/leader/mesos/Launcher.scala
leader/src/main/scala/org/apache/amaterasu/leader/mesos/executors/JobExecutor.scala
leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/AmaterasuScheduler.scala
leader/src/main/scala/org/apache/amaterasu/leader/utilities/BaseJobLauncher.scala
leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.scala
leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala

index ab179be..964f3ff 100644 (file)
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 buildscript {
-    ext.kotlin_version = '1.2.71'
+    ext.kotlin_version = '1.3.0'
 
     repositories {
         mavenCentral()
index a110cdd..9a456ce 100644 (file)
@@ -65,6 +65,12 @@ dependencies {
     testCompile 'junit:junit:4.11'
     testCompile 'org.scalatest:scalatest_2.11:3.0.1'
     testCompile 'org.scala-lang:scala-library:2.11.8'
+    testCompile 'org.jetbrains.spek:spek-api:1.1.5'
+    testCompile "org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version"
+    testRuntime 'org.jetbrains.spek:spek-junit-platform-engine:1.1.5'
+
+    // spek requires kotlin-reflect, can be omitted if already in the classpath
+    testRuntimeOnly "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
 }
 
 task copyToHome() {
@@ -84,7 +90,7 @@ sourceSets {
             srcDirs = ['src/main/kotlin','src/main/java', 'src/main/scala']
         }
         java {
-            srcDirs = []
+            srcDirs = ['src/main/java']
         }
     }
 }
@@ -98,6 +104,7 @@ compileTestKotlin {
 
 compileScala {
     dependsOn compileJava
+    classpath += files(compileJava.destinationDir) + files(compileKotlin.destinationDir)
 }
 
 compileJava {
index 3581560..3f3413f 100644 (file)
@@ -1,7 +1,10 @@
 package org.apache.amaterasu.common.logging;
 
+import org.slf4j.Logger;
+
 /**
  * Created by Eran Bartenstein (p765790) on 5/11/18.
  */
 public abstract class Logging extends KLogging {
+    protected Logger log = getLog();
 }
index fe69260..ee71f85 100755 (executable)
@@ -19,8 +19,9 @@ package org.apache.amaterasu.common.execution.actions
 import com.fasterxml.jackson.annotation.JsonProperty
 import org.apache.amaterasu.common.execution.actions.NotificationLevel.NotificationLevel
 import org.apache.amaterasu.common.execution.actions.NotificationType.NotificationType
+import org.apache.amaterasu.common.logging.Logging
 
-abstract class Notifier {
+abstract class Notifier extends Logging {
 
   def info(msg: String)
 
index 0c2edf8..90e624b 100644 (file)
@@ -25,7 +25,7 @@ import org.apache.activemq.ActiveMQConnectionFactory
 import org.apache.amaterasu.common.execution.actions.{Notification, NotificationLevel, NotificationType, Notifier}
 import org.apache.amaterasu.common.logging.Logging
 
-class ActiveNotifier extends Notifier with Logging {
+class ActiveNotifier extends Notifier {
 
   var producer: MessageProducer = _
   var session: Session = _
index 90c2001..c205500 100755 (executable)
@@ -32,7 +32,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.Future
 import scala.util.{Failure, Success}
 
-class MesosActionsExecutor extends Executor with Logging {
+class MesosActionsExecutor extends Logging with Executor {
 
   var master: String = _
   var executorDriver: ExecutorDriver = _
index a091c1b..fcb453a 100755 (executable)
@@ -23,7 +23,7 @@ import org.apache.amaterasu.common.logging.Logging
 import org.apache.mesos.ExecutorDriver
 
 
-class MesosNotifier(driver: ExecutorDriver) extends Notifier with Logging {
+class MesosNotifier(driver: ExecutorDriver) extends Notifier {
 
   private val mapper = new ObjectMapper()
   mapper.registerModule(DefaultScalaModule)
index b5f8700..282de68 100644 (file)
@@ -61,7 +61,7 @@ class ActionsExecutor extends Logging {
 
 // launched with args:
 //s"'${jobManager.jobId}' '${config.master}' '${actionData.name}' '${URLEncoder.encode(gson.toJson(taskData), "UTF-8")}' '${URLEncoder.encode(gson.toJson(execData), "UTF-8")}' '${actionData.id}-${container.getId.getContainerId}'"
-object ActionsExecutorLauncher extends App with Logging {
+object ActionsExecutorLauncher extends Logging with App {
 
   val hostName = InetAddress.getLocalHost.getHostName
 
index 841fe42..831cfc8 100644 (file)
@@ -21,7 +21,7 @@ import org.apache.amaterasu.common.logging.Logging
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
 
-class YarnNotifier(conf: YarnConfiguration) extends Notifier with Logging {
+class YarnNotifier(conf: YarnConfiguration) extends Notifier {
 
   var rpc: YarnRPC = YarnRPC.create(conf)
 
index e6c0a7d..a48aaa0 100644 (file)
@@ -38,7 +38,7 @@ import scala.collection.JavaConverters._
 import scala.collection.concurrent.TrieMap
 import scala.sys.process._
 
-class SparkRunnersProvider extends RunnersProvider with Logging {
+class SparkRunnersProvider extends Logging with RunnersProvider {
 
   private val runners = new TrieMap[String, AmaterasuRunner]
   private var shellLoger = ProcessLogger(
index 90f8c68..a60c827 100644 (file)
@@ -33,7 +33,7 @@ import scala.sys.process.{Process, ProcessLogger}
 
 
 
-class PySparkRunner extends AmaterasuRunner with Logging {
+class PySparkRunner extends Logging with AmaterasuRunner {
 
   var proc: Process = _
   var notifier: Notifier = _
index 61b9309..6ca9513 100644 (file)
@@ -15,7 +15,6 @@
  * limitations under the License.
  */
 buildscript {
-    ext.kotlin_version = '1.2.60'
 
     repositories {
         mavenCentral()
@@ -104,12 +103,3 @@ compileKotlin {
 compileTestKotlin {
     kotlinOptions.jvmTarget = "1.8"
 }
-//
-//kotlin {
-//    experimental {
-//        coroutines 'enable'
-//    }
-//}
-
-//task copyToHome() {
-//}
\ No newline at end of file
index de2da7d..e4f0e3e 100644 (file)
@@ -18,13 +18,14 @@ package org.apache.amaterasu.leader.common.execution.actions
 
 import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.common.configuration.enums.ActionStatus
+import org.apache.amaterasu.common.logging.KLogging
 import org.apache.amaterasu.common.logging.Logging
 import org.apache.curator.framework.CuratorFramework
 
 /**
  * Created by Eran Bartenstein on 19/10/18.
  */
-abstract class Action : Logging {
+abstract class Action : KLogging() {
     lateinit var actionPath: String
     lateinit var actionId: String
     lateinit var client: CuratorFramework
@@ -33,25 +34,25 @@ abstract class Action : Logging {
     abstract fun handleFailure(message: String) : String
 
     fun announceStart() {
-        log().debug("Starting action ${data.name} of group ${data.groupId} and type ${data.typeId}")
+        log.debug("Starting action ${data.name} of group ${data.groupId} and type ${data.typeId}")
         client.setData().forPath(actionPath, ActionStatus.started.value.toByteArray())
         data.status = ActionStatus.started
     }
 
     fun announceQueued() {
-        log().debug("Action ${data.name} of group ${data.groupId} and of type ${data.typeId} is queued for execution")
+        log.debug("Action ${data.name} of group ${data.groupId} and of type ${data.typeId} is queued for execution")
         client.setData().forPath(actionPath, ActionStatus.queued.value.toByteArray())
         data.status = ActionStatus.queued
     }
 
     fun announceComplete() {
-        log().debug("Action ${data.name} of group ${data.groupId} and of type ${data.typeId} complete")
+        log.debug("Action ${data.name} of group ${data.groupId} and of type ${data.typeId} complete")
         client.setData().forPath(actionPath, ActionStatus.complete.value.toByteArray())
         data.status = ActionStatus.complete
     }
 
     fun announceCanceled() {
-        log().debug("Action ${data.name} of group ${data.groupId} and of type ${data.typeId} was canceled")
+        log.debug("Action ${data.name} of group ${data.groupId} and of type ${data.typeId} was canceled")
         client.setData().forPath(actionPath, ActionStatus.canceled.value.toByteArray())
         data.status = ActionStatus.canceled
     }
index 737f59d..0f23438 100755 (executable)
@@ -24,7 +24,7 @@ import org.apache.amaterasu.leader.Kami
 import org.apache.amaterasu.leader.mesos.schedulers.ClusterScheduler
 import org.apache.mesos.{MesosSchedulerDriver, Protos}
 
-object Launcher extends App with Logging {
+object Launcher extends Logging with App {
 
   println(
     """
index 0ffdb7a..2adff07 100755 (executable)
@@ -20,7 +20,7 @@ import org.apache.amaterasu.common.logging.Logging
 import org.apache.mesos.Protos._
 import org.apache.mesos.{Executor, ExecutorDriver}
 
-object JobExecutor extends Executor with Logging {
+object JobExecutor extends Logging with Executor {
 
   override def shutdown(driver: ExecutorDriver): Unit = {}
 
index 4b1a74c..68c8f85 100755 (executable)
@@ -20,7 +20,7 @@ import org.apache.amaterasu.common.logging.Logging
 import org.apache.mesos.Protos.{Resource, Value}
 import org.apache.mesos.Scheduler
 
-trait AmaterasuScheduler extends Scheduler with Logging {
+trait AmaterasuScheduler extends Logging with Scheduler {
 
   def createScalarResource(name: String, value: Double): Resource = {
     Resource.newBuilder
index d1d0c53..38c90c7 100644 (file)
@@ -22,7 +22,7 @@ import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.logging.Logging
 
 
-abstract class BaseJobLauncher extends App with Logging {
+abstract class BaseJobLauncher extends Logging with App {
 
   def run(args: Args, config: ClusterConfig, resume: Boolean): Unit = ???
 
index 2870ef7..23700f8 100644 (file)
@@ -53,7 +53,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.Future
 import scala.util.{Failure, Success}
 
-class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
+class ApplicationMaster extends Logging with AMRMClientAsync.CallbackHandler {
 
   var capability: Resource = _
 
@@ -457,7 +457,7 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
   }
 }
 
-object ApplicationMaster extends App with Logging {
+object ApplicationMaster extends Logging with App {
 
 
   val parser = Args.getParser
index 14c4f43..23f4af6 100644 (file)
@@ -24,7 +24,7 @@ import org.apache.hadoop.yarn.api.records.{ContainerId, ContainerStatus}
 import org.apache.hadoop.yarn.client.api.async.NMClientAsync
 
 
-class YarnNMCallbackHandler extends NMClientAsync.CallbackHandler with Logging {
+class YarnNMCallbackHandler extends Logging with NMClientAsync.CallbackHandler {
 
   override def onStartContainerError(containerId: ContainerId, t: Throwable): Unit = {
     log.error(s"Container ${containerId.getContainerId} couldn't start.", t)
index 17b3a68..379dd1b 100644 (file)
@@ -41,7 +41,7 @@ class YarnRMCallbackHandler(nmClient: NMClientAsync,
                             env: String,
                             awsEnv: String,
                             config: ClusterConfig,
-                            executorJar: LocalResource) extends AMRMClientAsync.CallbackHandler with Logging {
+                            executorJar: LocalResource) extends Logging with AMRMClientAsync.CallbackHandler {
 
 
   val gson:Gson = new Gson()