AMATERASU-24 - Refactor Spark out of Amaterasu executor to it's own project 27/head
authorArun Manivannan <arun@arunma.com>
Thu, 28 Jun 2018 05:37:14 +0000 (13:37 +0800)
committerArun Manivannan <arun@arunma.com>
Thu, 28 Jun 2018 05:37:14 +0000 (13:37 +0800)
47 files changed:
frameworks/spark/runner/build.gradle [new file with mode: 0644]
frameworks/spark/runner/src/main/java/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkEntryPoint.java [new file with mode: 0755]
frameworks/spark/runner/src/main/resources/codegen.py [new file with mode: 0644]
frameworks/spark/runner/src/main/resources/runtime.py [new file with mode: 0644]
frameworks/spark/runner/src/main/resources/spark-version-info.properties [new file with mode: 0644]
frameworks/spark/runner/src/main/resources/spark_intp.py [new file with mode: 0755]
frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/SparkRunnersProvider.scala [new file with mode: 0644]
frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkExecutionQueue.scala [new file with mode: 0755]
frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkResultQueue.scala [new file with mode: 0755]
frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkRunner.scala [new file with mode: 0644]
frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/pyspark/ResultQueue.scala [new file with mode: 0755]
frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/repl/AmaSparkILoop.scala [new file with mode: 0755]
frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/repl/SparkRunnerHelper.scala [new file with mode: 0644]
frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/repl/SparkScalaRunner.scala [new file with mode: 0755]
frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/sparkr/SparkRRunner.scala [new file with mode: 0644]
frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/sparksql/SparkSqlRunner.scala [new file with mode: 0644]
frameworks/spark/runner/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv [new file with mode: 0644]
frameworks/spark/runner/src/test/resources/SparkSql/json/SparkSqlTestData.json [new file with mode: 0644]
frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc [new file with mode: 0644]
frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc [new file with mode: 0644]
frameworks/spark/runner/src/test/resources/SparkSql/parquet/_SUCCESS [new file with mode: 0644]
frameworks/spark/runner/src/test/resources/SparkSql/parquet/_common_metadata [new file with mode: 0644]
frameworks/spark/runner/src/test/resources/SparkSql/parquet/_metadata [new file with mode: 0644]
frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet [new file with mode: 0644]
frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet [new file with mode: 0644]
frameworks/spark/runner/src/test/resources/amaterasu.properties [new file with mode: 0755]
frameworks/spark/runner/src/test/resources/py4j-0.10.4-src.zip [new file with mode: 0644]
frameworks/spark/runner/src/test/resources/py4j.tar.gz [new file with mode: 0644]
frameworks/spark/runner/src/test/resources/pyspark-with-amacontext.py [new file with mode: 0755]
frameworks/spark/runner/src/test/resources/pyspark.tar.gz [new file with mode: 0644]
frameworks/spark/runner/src/test/resources/pyspark.zip [new file with mode: 0644]
frameworks/spark/runner/src/test/resources/simple-pyspark.py [new file with mode: 0755]
frameworks/spark/runner/src/test/resources/simple-python-err.py [new file with mode: 0755]
frameworks/spark/runner/src/test/resources/simple-python.py [new file with mode: 0755]
frameworks/spark/runner/src/test/resources/simple-spark.scala [new file with mode: 0755]
frameworks/spark/runner/src/test/resources/step-2.scala [new file with mode: 0755]
frameworks/spark/runner/src/test/resources/tmp/job/start/x/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet [new file with mode: 0644]
frameworks/spark/runner/src/test/resources/tmp/job/start/x/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet [new file with mode: 0644]
frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/SparkTestsSuite.scala [new file with mode: 0644]
frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/runner/RunnersLoadingTests.scala [new file with mode: 0644]
frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkRunnerTests.scala [new file with mode: 0755]
frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/runner/repl/SparkScalaRunnerTests.scala [new file with mode: 0755]
frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/runner/sparksql/SparkSqlRunnerTests.scala [new file with mode: 0644]
frameworks/spark/runner/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala [new file with mode: 0644]
frameworks/spark/runtime/build.gradle [new file with mode: 0644]
frameworks/spark/runtime/src/main/scala/org/apache/amaterasu/framework/spark/runtime/AmaContext.scala [new file with mode: 0644]
gradle/wrapper/gradle-wrapper.properties

diff --git a/frameworks/spark/runner/build.gradle b/frameworks/spark/runner/build.gradle
new file mode 100644 (file)
index 0000000..cc6c902
--- /dev/null
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+plugins {
+    id 'com.github.johnrengelman.shadow' version '1.2.4'
+    id 'com.github.maiflai.scalatest' version '0.6-5-g9065d91'
+    id 'scala'
+    id 'java'
+}
+
+shadowJar {
+    zip64 true
+}
+
+repositories {
+    maven {
+        url "https://plugins.gradle.org/m2/"
+    }
+    mavenCentral()
+}
+
+test {
+    maxParallelForks = 1
+    forkEvery = 1
+}
+
+configurations {
+    provided
+}
+
+sourceSets {
+    main.compileClasspath += configurations.provided
+    test.compileClasspath += configurations.provided
+    test.runtimeClasspath += configurations.provided
+}
+
+dependencies {
+
+    compile project(':executor')
+    compile project(':spark-runtime')
+    compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.8'
+    compile group: 'org.scala-lang', name: 'scala-reflect', version: '2.11.8'
+    compile group: 'org.scala-lang', name: 'scala-compiler', version: '2.11.8'
+    compile group: 'io.netty', name: 'netty-all', version: '4.0.42.Final'
+    compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.5'
+    compile group: 'org.apache.maven', name: 'maven-core', version: '3.0.5'
+    compile group: 'org.reflections', name: 'reflections', version: '0.9.10'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.6.5'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.5'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.5'
+    compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.6.5'
+    compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0'
+
+    compile('com.jcabi:jcabi-aether:0.10.1') {
+        exclude group: 'org.jboss.netty'
+    }
+    compile('org.apache.activemq:activemq-client:5.15.2') {
+        exclude group: 'org.jboss.netty'
+    }
+
+    //compile project(':common')
+    //compile project(':amaterasu-sdk')
+
+    //runtime dependency for spark
+    provided('org.apache.spark:spark-repl_2.11:2.2.1')
+    provided('org.apache.spark:spark-core_2.11:2.2.1')
+
+    testCompile project(':common')
+    testCompile "gradle.plugin.com.github.maiflai:gradle-scalatest:0.14"
+    testRuntime 'org.pegdown:pegdown:1.1.0'
+    testCompile 'junit:junit:4.11'
+    testCompile 'org.scalatest:scalatest_2.11:3.0.2'
+    testCompile 'org.scala-lang:scala-library:2.11.8'
+    testCompile('org.apache.spark:spark-repl_2.11:2.2.1')
+    testCompile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.9'
+
+}
+
+sourceSets {
+    test {
+        resources.srcDirs += [file('src/test/resources')]
+    }
+
+    main {
+        scala {
+            srcDirs = ['src/main/scala', 'src/main/java']
+        }
+        java {
+            srcDirs = []
+        }
+    }
+}
+
+test {
+
+    maxParallelForks = 1
+}
+
+task copyToHome(type: Copy) {
+    dependsOn shadowJar
+    from 'build/libs'
+    into '../../../build/amaterasu/dist'
+    from 'build/resources/main'
+    into '../../../build/amaterasu/dist'
+}
diff --git a/frameworks/spark/runner/src/main/java/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkEntryPoint.java b/frameworks/spark/runner/src/main/java/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkEntryPoint.java
new file mode 100755 (executable)
index 0000000..6b79b2f
--- /dev/null
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.framework.spark.runner.pyspark;
+
+import org.apache.amaterasu.common.runtime.Environment;
+import org.apache.amaterasu.framework.spark.runtime.AmaContext;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkEnv;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
+import py4j.GatewayServer;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PySparkEntryPoint {
+
+    //private static Boolean started = false;
+    private static  PySparkExecutionQueue queue = new PySparkExecutionQueue();
+    private static ConcurrentHashMap<String, ResultQueue> resultQueues = new ConcurrentHashMap<>();
+
+    private static int port = 0;
+    private static SparkSession sparkSession = null;
+    private static JavaSparkContext jsc = null;
+    private static SQLContext sqlContext = null;
+    private static SparkEnv sparkEnv = null;
+
+    public static PySparkExecutionQueue getExecutionQueue() {
+        return queue;
+    }
+
+    public static ResultQueue getResultQueue(String actionName) {
+        resultQueues.putIfAbsent(actionName, new ResultQueue());
+        return resultQueues.get(actionName);
+    }
+
+    public static JavaSparkContext getJavaSparkContext() {
+        SparkEnv.set(sparkEnv);
+        return jsc;
+    }
+
+    public static String getJobId(){
+        return AmaContext.jobId();
+    }
+
+    public static Environment getEnv(){
+        return AmaContext.env();
+    }
+
+    public static SQLContext getSqlContext() {
+        SparkEnv.set(sparkEnv);
+        return sqlContext;
+    }
+
+    public static SparkSession getSparkSession() {
+        SparkEnv.set(sparkEnv);
+        return sparkSession;
+    }
+
+    public static SparkConf getSparkConf() {
+        return jsc.getConf();
+    }
+
+    private static void generatePort() {
+
+        try {
+
+            ServerSocket socket = new ServerSocket(0);
+            port = socket.getLocalPort();
+
+            socket.close();
+
+        } catch (IOException e) {
+        }
+
+    }
+
+    public static int getPort() {
+        return port;
+    }
+
+    public static void start(SparkSession spark,
+                             String jobName,
+                             Environment env,
+                             SparkEnv sparkEnv) {
+
+        AmaContext.init(spark, jobName, env);
+
+        sparkSession = spark;
+        jsc = new JavaSparkContext(spark.sparkContext());
+        sqlContext = spark.sqlContext();
+        PySparkEntryPoint.sparkEnv = sparkEnv;
+        generatePort();
+        GatewayServer gatewayServer = new GatewayServer(new PySparkEntryPoint(), port);
+
+        gatewayServer.start();
+    }
+}
diff --git a/frameworks/spark/runner/src/main/resources/codegen.py b/frameworks/spark/runner/src/main/resources/codegen.py
new file mode 100644 (file)
index 0000000..113d9be
--- /dev/null
@@ -0,0 +1,577 @@
+"""
+    codegen
+    ~~~~~~~
+
+    Extension to ast that allow ast -> python code generation.
+
+    :copyright: Copyright 2008 by Armin Ronacher.
+    :license: BSD.
+"""
+from ast import *
+
+BINOP_SYMBOLS = {}
+BINOP_SYMBOLS[Add] = '+'
+BINOP_SYMBOLS[Sub] = '-'
+BINOP_SYMBOLS[Mult] = '*'
+BINOP_SYMBOLS[Div] = '/'
+BINOP_SYMBOLS[Mod] = '%'
+BINOP_SYMBOLS[Pow] = '**'
+BINOP_SYMBOLS[LShift] = '<<'
+BINOP_SYMBOLS[RShift] = '>>'
+BINOP_SYMBOLS[BitOr] = '|'
+BINOP_SYMBOLS[BitXor] = '^'
+BINOP_SYMBOLS[BitAnd] = '&'
+BINOP_SYMBOLS[FloorDiv] = '//'
+
+BOOLOP_SYMBOLS = {}
+BOOLOP_SYMBOLS[And] = 'and'
+BOOLOP_SYMBOLS[Or] = 'or'
+
+CMPOP_SYMBOLS = {}
+CMPOP_SYMBOLS[Eq] = '=='
+CMPOP_SYMBOLS[NotEq] = '!='
+CMPOP_SYMBOLS[Lt] = '<'
+CMPOP_SYMBOLS[LtE] = '<='
+CMPOP_SYMBOLS[Gt] = '>'
+CMPOP_SYMBOLS[GtE] = '>='
+CMPOP_SYMBOLS[Is] = 'is'
+CMPOP_SYMBOLS[IsNot] = 'is not'
+CMPOP_SYMBOLS[In] = 'in'
+CMPOP_SYMBOLS[NotIn] = 'not in'
+
+UNARYOP_SYMBOLS = {}
+UNARYOP_SYMBOLS[Invert] = '~'
+UNARYOP_SYMBOLS[Not] = 'not'
+UNARYOP_SYMBOLS[UAdd] = '+'
+UNARYOP_SYMBOLS[USub] = '-'
+
+
+def to_source(node, indent_with=' ' * 4, add_line_information=False):
+    """This function can convert a node tree back into python sourcecode.
+    This is useful for debugging purposes, especially if you're dealing with
+    custom asts not generated by python itself.
+
+    It could be that the sourcecode is evaluable when the AST itself is not
+    compilable / evaluable.  The reason for this is that the AST contains some
+    more data than regular sourcecode does, which is dropped during
+    conversion.
+
+    Each level of indentation is replaced with `indent_with`.  Per default this
+    parameter is equal to four spaces as suggested by PEP 8, but it might be
+    adjusted to match the application's styleguide.
+
+    If `add_line_information` is set to `True` comments for the line numbers
+    of the nodes are added to the output.  This can be used to spot wrong line
+    number information of statement nodes.
+    """
+    generator = SourceGenerator(indent_with, add_line_information)
+    generator.visit(node)
+
+    return ''.join(generator.result)
+
+class SourceGenerator(NodeVisitor):
+    """This visitor is able to transform a well formed syntax tree into python
+    sourcecode.  For more details have a look at the docstring of the
+    `node_to_source` function.
+    """
+
+    def __init__(self, indent_with, add_line_information=False):
+        self.result = []
+        self.indent_with = indent_with
+        self.add_line_information = add_line_information
+        self.indentation = 0
+        self.new_lines = 0
+
+    def write(self, x):
+        if self.new_lines:
+            if self.result:
+                self.result.append('\n' * self.new_lines)
+            self.result.append(self.indent_with * self.indentation)
+            self.new_lines = 0
+        self.result.append(x)
+
+    def newline(self, node=None, extra=0):
+        self.new_lines = max(self.new_lines, 1 + extra)
+        if node is not None and self.add_line_information:
+            self.write('# line: %s' % node.lineno)
+            self.new_lines = 1
+
+    def body(self, statements):
+        self.new_line = True
+        self.indentation += 1
+        for stmt in statements:
+            self.visit(stmt)
+        self.indentation -= 1
+
+    def body_or_else(self, node):
+        self.body(node.body)
+        if node.orelse:
+            self.newline()
+            self.write('else:')
+            self.body(node.orelse)
+
+    def signature(self, node):
+        want_comma = []
+        def write_comma():
+            if want_comma:
+                self.write(', ')
+            else:
+                want_comma.append(True)
+
+        padding = [None] * (len(node.args) - len(node.defaults))
+        for arg, default in zip(node.args, padding + node.defaults):
+            write_comma()
+            self.visit(arg)
+            if default is not None:
+                self.write('=')
+                self.visit(default)
+        if node.vararg is not None:
+            write_comma()
+            self.write('*' + node.vararg)
+        if node.kwarg is not None:
+            write_comma()
+            self.write('**' + node.kwarg)
+
+    def decorators(self, node):
+        for decorator in node.decorator_list:
+            self.newline(decorator)
+            self.write('@')
+            self.visit(decorator)
+
+    # Statements
+
+    def visit_Assert(self, node):
+        self.newline(node)
+        self.write('assert ')
+        self.visit(node.test)
+        if node.msg is not None:
+           self.write(', ')
+           self.visit(node.msg)
+
+    def visit_Assign(self, node):
+        self.newline(node)
+        for idx, target in enumerate(node.targets):
+            if idx:
+                self.write(', ')
+            self.visit(target)
+        self.write(' = ')
+        self.visit(node.value)
+
+    def visit_AugAssign(self, node):
+        self.newline(node)
+        self.visit(node.target)
+        self.write(' ' + BINOP_SYMBOLS[type(node.op)] + '= ')
+        self.visit(node.value)
+
+    def visit_ImportFrom(self, node):
+        self.newline(node)
+        self.write('from %s%s import ' % ('.' * node.level, node.module))
+        for idx, item in enumerate(node.names):
+            if idx:
+                self.write(', ')
+            self.write(item)
+
+    def visit_Import(self, node):
+        self.newline(node)
+        for item in node.names:
+            self.write('import ')
+            self.visit(item)
+
+    def visit_Expr(self, node):
+        self.newline(node)
+        self.generic_visit(node)
+
+    def visit_FunctionDef(self, node):
+        self.newline(extra=1)
+        self.decorators(node)
+        self.newline(node)
+        self.write('def %s(' % node.name)
+        self.visit(node.args)
+        self.write('):')
+        self.body(node.body)
+
+    def visit_ClassDef(self, node):
+        have_args = []
+        def paren_or_comma():
+            if have_args:
+                self.write(', ')
+            else:
+                have_args.append(True)
+                self.write('(')
+
+        self.newline(extra=2)
+        self.decorators(node)
+        self.newline(node)
+        self.write('class %s' % node.name)
+        for base in node.bases:
+            paren_or_comma()
+            self.visit(base)
+        # XXX: the if here is used to keep this module compatible
+        #      with python 2.6.
+        if hasattr(node, 'keywords'):
+            for keyword in node.keywords:
+                paren_or_comma()
+                self.write(keyword.arg + '=')
+                self.visit(keyword.value)
+            if node.starargs is not None:
+                paren_or_comma()
+                self.write('*')
+                self.visit(node.starargs)
+            if node.kwargs is not None:
+                paren_or_comma()
+                self.write('**')
+                self.visit(node.kwargs)
+        self.write(have_args and '):' or ':')
+        self.body(node.body)
+
+    def visit_If(self, node):
+        self.newline(node)
+        self.write('if ')
+        self.visit(node.test)
+        self.write(':')
+        self.body(node.body)
+        while True:
+            else_ = node.orelse
+            if len(else_) == 0:
+                break
+            elif len(else_) == 1 and isinstance(else_[0], If):
+                node = else_[0]
+                self.newline()
+                self.write('elif ')
+                self.visit(node.test)
+                self.write(':')
+                self.body(node.body)
+            else:
+                self.newline()
+                self.write('else:')
+                self.body(else_)
+                break
+
+    def visit_For(self, node):
+        self.newline(node)
+        self.write('for ')
+        self.visit(node.target)
+        self.write(' in ')
+        self.visit(node.iter)
+        self.write(':')
+        self.body_or_else(node)
+
+    def visit_While(self, node):
+        self.newline(node)
+        self.write('while ')
+        self.visit(node.test)
+        self.write(':')
+        self.body_or_else(node)
+
+    def visit_With(self, node):
+        self.newline(node)
+        self.write('with ')
+        self.visit(node.context_expr)
+        if node.optional_vars is not None:
+            self.write(' as ')
+            self.visit(node.optional_vars)
+        self.write(':')
+        self.body(node.body)
+
+    def visit_Pass(self, node):
+        self.newline(node)
+        self.write('pass')
+
+    def visit_Print(self, node):
+        # XXX: python 2.6 only
+        self.newline(node)
+        self.write('print ')
+        want_comma = False
+        if node.dest is not None:
+            self.write(' >> ')
+            self.visit(node.dest)
+            want_comma = True
+        for value in node.values:
+            if want_comma:
+                self.write(', ')
+            self.visit(value)
+            want_comma = True
+        if not node.nl:
+            self.write(',')
+
+    def visit_Delete(self, node):
+        self.newline(node)
+        self.write('del ')
+        for idx, target in enumerate(node):
+            if idx:
+                self.write(', ')
+            self.visit(target)
+
+    def visit_TryExcept(self, node):
+        self.newline(node)
+        self.write('try:')
+        self.body(node.body)
+        for handler in node.handlers:
+            self.visit(handler)
+
+    def visit_TryFinally(self, node):
+        self.newline(node)
+        self.write('try:')
+        self.body(node.body)
+        self.newline(node)
+        self.write('finally:')
+        self.body(node.finalbody)
+
+    def visit_Global(self, node):
+        self.newline(node)
+        self.write('global ' + ', '.join(node.names))
+
+    def visit_Nonlocal(self, node):
+        self.newline(node)
+        self.write('nonlocal ' + ', '.join(node.names))
+
+    def visit_Return(self, node):
+        self.newline(node)
+        if node.value is None:
+            self.write('return')
+        else:
+            self.write('return ')
+            self.visit(node.value)
+
+    def visit_Break(self, node):
+        self.newline(node)
+        self.write('break')
+
+    def visit_Continue(self, node):
+        self.newline(node)
+        self.write('continue')
+
+    def visit_Raise(self, node):
+        # XXX: Python 2.6 / 3.0 compatibility
+        self.newline(node)
+        self.write('raise')
+        if hasattr(node, 'exc') and node.exc is not None:
+            self.write(' ')
+            self.visit(node.exc)
+            if node.cause is not None:
+                self.write(' from ')
+                self.visit(node.cause)
+        elif hasattr(node, 'type') and node.type is not None:
+            self.visit(node.type)
+            if node.inst is not None:
+                self.write(', ')
+                self.visit(node.inst)
+            if node.tback is not None:
+                self.write(', ')
+                self.visit(node.tback)
+
+    # Expressions
+
+    def visit_Attribute(self, node):
+        self.visit(node.value)
+        self.write('.' + node.attr)
+
+    def visit_Call(self, node):
+        want_comma = []
+        def write_comma():
+            if want_comma:
+                self.write(', ')
+            else:
+                want_comma.append(True)
+
+        self.visit(node.func)
+        self.write('(')
+        for arg in node.args:
+            write_comma()
+            self.visit(arg)
+        for keyword in node.keywords:
+            write_comma()
+            self.write(keyword.arg + '=')
+            self.visit(keyword.value)
+        if node.starargs is not None:
+            write_comma()
+            self.write('*')
+            self.visit(node.starargs)
+        if node.kwargs is not None:
+            write_comma()
+            self.write('**')
+            self.visit(node.kwargs)
+        self.write(')')
+
+    def visit_Name(self, node):
+        self.write(node.id)
+
+    def visit_Str(self, node):
+        self.write(repr(node.s))
+
+    def visit_Bytes(self, node):
+        self.write(repr(node.s))
+
+    def visit_Num(self, node):
+        self.write(repr(node.n))
+
+    def visit_Tuple(self, node):
+        self.write('(')
+        idx = -1
+        for idx, item in enumerate(node.elts):
+            if idx:
+                self.write(', ')
+            self.visit(item)
+        self.write(idx and ')' or ',)')
+
+    def sequence_visit(left, right):
+        def visit(self, node):
+            self.write(left)
+            for idx, item in enumerate(node.elts):
+                if idx:
+                    self.write(', ')
+                self.visit(item)
+            self.write(right)
+        return visit
+
+    visit_List = sequence_visit('[', ']')
+    visit_Set = sequence_visit('{', '}')
+    del sequence_visit
+
+    def visit_Dict(self, node):
+        self.write('{')
+        for idx, (key, value) in enumerate(zip(node.keys, node.values)):
+            if idx:
+                self.write(', ')
+            self.visit(key)
+            self.write(': ')
+            self.visit(value)
+        self.write('}')
+
+    def visit_BinOp(self, node):
+        self.visit(node.left)
+        self.write(' %s ' % BINOP_SYMBOLS[type(node.op)])
+        self.visit(node.right)
+
+    def visit_BoolOp(self, node):
+        self.write('(')
+        for idx, value in enumerate(node.values):
+            if idx:
+                self.write(' %s ' % BOOLOP_SYMBOLS[type(node.op)])
+            self.visit(value)
+        self.write(')')
+
+    def visit_Compare(self, node):
+        self.write('(')
+        self.visit(node.left)
+        for op, right in zip(node.ops, node.comparators):
+            self.write(' %s ' % CMPOP_SYMBOLS[type(op)])
+            self.visit(right)
+        self.write(')')
+
+    def visit_UnaryOp(self, node):
+        self.write('(')
+        op = UNARYOP_SYMBOLS[type(node.op)]
+        self.write(op)
+        if op == 'not':
+            self.write(' ')
+        self.visit(node.operand)
+        self.write(')')
+
+    def visit_Subscript(self, node):
+        self.visit(node.value)
+        self.write('[')
+        self.visit(node.slice)
+        self.write(']')
+
+    def visit_Slice(self, node):
+        if node.lower is not None:
+            self.visit(node.lower)
+        self.write(':')
+        if node.upper is not None:
+            self.visit(node.upper)
+        if node.step is not None:
+            self.write(':')
+            if not (isinstance(node.step, Name) and node.step.id == 'None'):
+                self.visit(node.step)
+
+    def visit_ExtSlice(self, node):
+        for idx, item in node.dims:
+            if idx:
+                self.write(', ')
+            self.visit(item)
+
+    def visit_Yield(self, node):
+        self.write('yield ')
+        self.visit(node.value)
+
+    def visit_Lambda(self, node):
+        self.write('lambda ')
+        self.visit(node.args)
+        self.write(': ')
+        self.visit(node.body)
+
+    def visit_Ellipsis(self, node):
+        self.write('Ellipsis')
+
+    def generator_visit(left, right):
+        def visit(self, node):
+            self.write(left)
+            self.visit(node.elt)
+            for comprehension in node.generators:
+                self.visit(comprehension)
+            self.write(right)
+        return visit
+
+    visit_ListComp = generator_visit('[', ']')
+    visit_GeneratorExp = generator_visit('(', ')')
+    visit_SetComp = generator_visit('{', '}')
+    del generator_visit
+
+    def visit_DictComp(self, node):
+        self.write('{')
+        self.visit(node.key)
+        self.write(': ')
+        self.visit(node.value)
+        for comprehension in node.generators:
+            self.visit(comprehension)
+        self.write('}')
+
+    def visit_IfExp(self, node):
+        self.visit(node.body)
+        self.write(' if ')
+        self.visit(node.test)
+        self.write(' else ')
+        self.visit(node.orelse)
+
+    def visit_Starred(self, node):
+        self.write('*')
+        self.visit(node.value)
+
+    def visit_Repr(self, node):
+        # XXX: python 2.6 only
+        self.write('`')
+        self.visit(node.value)
+        self.write('`')
+
+    # Helper Nodes
+
+    def visit_alias(self, node):
+        self.write(node.name)
+        if node.asname is not None:
+            self.write(' as ' + node.asname)
+
+    def visit_comprehension(self, node):
+        self.write(' for ')
+        self.visit(node.target)
+        self.write(' in ')
+        self.visit(node.iter)
+        if node.ifs:
+            for if_ in node.ifs:
+                self.write(' if ')
+                self.visit(if_)
+
+    def visit_excepthandler(self, node):
+        self.newline(node)
+        self.write('except')
+        if node.type is not None:
+            self.write(' ')
+            self.visit(node.type)
+            if node.name is not None:
+                self.write(' as ')
+                self.visit(node.name)
+        self.write(':')
+        self.body(node.body)
+
+    def visit_arguments(self, node):
+        self.signature(node)
diff --git a/frameworks/spark/runner/src/main/resources/runtime.py b/frameworks/spark/runner/src/main/resources/runtime.py
new file mode 100644 (file)
index 0000000..d01664c
--- /dev/null
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+class AmaContext(object):
+
+    def __init__(self, sc, spark, job_id, env):
+        self.sc = sc
+        self.spark = spark
+        self.job_id = job_id
+        self.env = env
+
+    def get_dataframe(self, action_name, dataset_name, format = "parquet"):
+        return self.spark.read.format(format).load(str(self.env.working_dir) + "/" + self.job_id + "/" + action_name + "/" + dataset_name)
+
+class Environment(object):
+
+    def __init__(self, name, master, input_root_path, output_root_path, working_dir, configuration):
+        self.name = name
+        self.master = master
+        self.input_root_path = input_root_path
+        self.output_root_path = output_root_path
+        self.working_dir = working_dir
+        self.configuration = configuration
diff --git a/frameworks/spark/runner/src/main/resources/spark-version-info.properties b/frameworks/spark/runner/src/main/resources/spark-version-info.properties
new file mode 100644 (file)
index 0000000..ce0b312
--- /dev/null
@@ -0,0 +1,11 @@
+version=2.1.0-SNAPSHOT
+
+user=root
+
+revision=738b4cc548ca48c010b682b8bc19a2f7e1947cfe
+
+branch=master
+
+date=2016-07-27T11:23:21Z
+
+url=https://github.com/apache/spark.git
diff --git a/frameworks/spark/runner/src/main/resources/spark_intp.py b/frameworks/spark/runner/src/main/resources/spark_intp.py
new file mode 100755 (executable)
index 0000000..f3c9fc0
--- /dev/null
@@ -0,0 +1,110 @@
+#!/usr/bin/python
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import ast
+import codegen
+import os
+import sys
+import zipimport
+sys.path.append(os.getcwd())
+from runtime import AmaContext, Environment
+
+# os.chdir(os.getcwd() + '/build/resources/test/')
+# import zipfile
+# zip = zipfile.ZipFile('pyspark.zip')
+# zip.extractall()
+# zip = zipfile.ZipFile('py4j-0.10.4-src.zip', 'r')
+# zip.extractall()
+# sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/pyspark')
+# sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/py4j')
+
+# py4j_path = 'spark-2.2.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip'
+# py4j_importer = zipimport.zipimporter(py4j_path)
+# py4j = py4j_importer.load_module('py4j')
+from py4j.java_gateway import JavaGateway, GatewayClient, java_import
+from py4j.protocol import Py4JJavaError
+from pyspark.conf import SparkConf
+from pyspark.context import SparkContext
+from pyspark.rdd import RDD
+from pyspark.files import SparkFiles
+from pyspark.storagelevel import StorageLevel
+from pyspark import accumulators
+from pyspark.accumulators import Accumulator, AccumulatorParam
+from pyspark.broadcast import Broadcast
+from pyspark.serializers import MarshalSerializer, PickleSerializer
+from pyspark.sql import SparkSession
+from pyspark.sql import Row
+
+client = GatewayClient(port=int(sys.argv[1]))
+gateway = JavaGateway(client, auto_convert=True)
+entry_point = gateway.entry_point
+queue = entry_point.getExecutionQueue()
+
+java_import(gateway.jvm, "org.apache.spark.SparkEnv")
+java_import(gateway.jvm, "org.apache.spark.SparkConf")
+java_import(gateway.jvm, "org.apache.spark.api.java.*")
+java_import(gateway.jvm, "org.apache.spark.api.python.*")
+java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
+java_import(gateway.jvm, "org.apache.spark.sql.*")
+java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
+java_import(gateway.jvm, "scala.Tuple2")
+
+jconf = entry_point.getSparkConf()
+jsc = entry_point.getJavaSparkContext()
+
+job_id = entry_point.getJobId()
+javaEnv = entry_point.getEnv()
+
+env = Environment(javaEnv.name(), javaEnv.master(), javaEnv.inputRootPath(), javaEnv.outputRootPath(), javaEnv.workingDir(), javaEnv.configuration())
+conf = SparkConf(_jvm=gateway.jvm, _jconf=jconf)
+conf.setExecutorEnv('PYTHONPATH', ':'.join(sys.path))
+sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
+spark = SparkSession(sc, entry_point.getSparkSession())
+
+ama_context = AmaContext(sc, spark, job_id, env)
+
+while True:
+    actionData = queue.getNext()
+    resultQueue = entry_point.getResultQueue(actionData._2())
+    actionSource = actionData._1()
+    tree = ast.parse(actionSource)
+    exports = actionData._3()
+
+    for node in tree.body:
+
+        wrapper = ast.Module(body=[node])
+        try:
+            co = compile(wrapper, "<ast>", 'exec')
+            exec (co)
+            resultQueue.put('success', actionData._2(), codegen.to_source(node), '')
+
+            #if this node is an assignment, we need to check if it needs to be persisted
+            try:
+                persistCode = ''
+                if(isinstance(node,ast.Assign)):
+                    varName = node.targets[0].id
+                    if(exports.containsKey(varName)):
+                        persistCode = varName + ".write.save(\"" + env.working_dir + "/" + job_id + "/" + actionData._2() + "/" + varName + "\", format=\"" + exports[varName] + "\", mode='overwrite')"
+                        persist = compile(persistCode, '<stdin>', 'exec')
+                        exec(persist)
+
+            except:
+                resultQueue.put('error', actionData._2(), persistCode, str(sys.exc_info()[1]))
+        except:
+            resultQueue.put('error', actionData._2(), codegen.to_source(node), str(sys.exc_info()[1]))
+    resultQueue.put('completion', '', '', '')
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/SparkRunnersProvider.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/SparkRunnersProvider.scala
new file mode 100644 (file)
index 0000000..652f32b
--- /dev/null
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.framework.spark.runner
+
+import java.io._
+
+import com.jcabi.aether.Aether
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.dataobjects.ExecData
+import org.apache.amaterasu.common.execution.actions.Notifier
+import org.apache.amaterasu.common.execution.dependencies.{Dependencies, PythonDependencies, PythonPackage}
+import org.apache.amaterasu.common.logging.Logging
+import org.apache.amaterasu.framework.spark.runner.pyspark.PySparkRunner
+import org.apache.amaterasu.framework.spark.runner.repl.{SparkRunnerHelper, SparkScalaRunner}
+import org.apache.amaterasu.framework.spark.runner.sparksql.SparkSqlRunner
+import org.apache.amaterasu.sdk.{AmaterasuRunner, RunnersProvider}
+import org.eclipse.aether.util.artifact.JavaScopes
+import org.sonatype.aether.repository.RemoteRepository
+import org.sonatype.aether.util.artifact.DefaultArtifact
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.concurrent.TrieMap
+import scala.sys.process._
+
+class SparkRunnersProvider extends RunnersProvider with Logging {
+
+  private val runners = new TrieMap[String, AmaterasuRunner]
+  private var shellLoger = ProcessLogger(
+    (o: String) => log.info(o),
+    (e: String) => log.error(e)
+
+  )
+  private var conf: Option[Map[String, Any]] = _
+  private var executorEnv: Option[Map[String, Any]] = _
+  private var clusterConfig: ClusterConfig = _
+
+  override def init(execData: ExecData,
+                    jobId: String,
+                    outStream: ByteArrayOutputStream,
+                    notifier: Notifier,
+                    executorId: String,
+                    config: ClusterConfig,
+                    hostName: String): Unit = {
+
+    shellLoger = ProcessLogger(
+      (o: String) => log.info(o),
+      (e: String) => log.error("", e)
+    )
+    clusterConfig = config
+    var jars = Seq.empty[String]
+
+    if (execData.deps != null) {
+      jars ++= getDependencies(execData.deps)
+    }
+
+    if (execData.pyDeps != null &&
+      execData.pyDeps.packages.nonEmpty) {
+      loadPythonDependencies(execData.pyDeps, notifier)
+    }
+
+    conf = execData.configurations.get("spark")
+    executorEnv = execData.configurations.get("spark_exec_env")
+    val sparkAppName = s"job_${jobId}_executor_$executorId"
+
+    SparkRunnerHelper.notifier = notifier
+    val spark = SparkRunnerHelper.createSpark(execData.env, sparkAppName, jars, conf, executorEnv, config, hostName)
+
+    lazy val sparkScalaRunner = SparkScalaRunner(execData.env, jobId, spark, outStream, notifier, jars)
+    sparkScalaRunner.initializeAmaContext(execData.env)
+
+    runners.put(sparkScalaRunner.getIdentifier, sparkScalaRunner)
+    var pypath = ""
+    // TODO: get rid of hard-coded version
+    config.mode match {
+      case "yarn" =>
+        pypath = s"$$PYTHONPATH:$$SPARK_HOME/python:$$SPARK_HOME/python/build:${config.spark.home}/python:${config.spark.home}/python/pyspark:${config.spark.home}/python/pyspark/build:${config.spark.home}/python/pyspark/lib/py4j-0.10.4-src.zip:${new File(".").getAbsolutePath}"
+      case "mesos" =>
+        pypath = s"${new File(".").getAbsolutePath}/miniconda/pkgs:${new File(".").getAbsolutePath}"
+    }
+    lazy val pySparkRunner = PySparkRunner(execData.env, jobId, notifier, spark, pypath, execData.pyDeps, config)
+    runners.put(pySparkRunner.getIdentifier, pySparkRunner)
+
+    lazy val sparkSqlRunner = SparkSqlRunner(execData.env, jobId, notifier, spark)
+    runners.put(sparkSqlRunner.getIdentifier, sparkSqlRunner)
+  }
+
+  private def installAnacondaPackage(pythonPackage: PythonPackage): Unit = {
+    val channel = pythonPackage.channel.getOrElse("anaconda")
+    if (channel == "anaconda") {
+      Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y ${pythonPackage.packageId}") ! shellLoger
+    } else {
+      Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.packageId}") ! shellLoger
+    }
+  }
+
+  private def installAnacondaOnNode(): Unit = {
+    // TODO: get rid of hard-coded version
+
+    this.clusterConfig.mode match {
+      case "yarn" => Seq("sh", "-c", "export HOME=$PWD && ./miniconda.sh -b -p miniconda") ! shellLoger
+      case "mesos" => Seq("sh", "miniconda.sh", "-b", "-p", "miniconda") ! shellLoger
+    }
+
+    Seq("bash", "-c", "export HOME=$PWD && ./miniconda/bin/python -m conda install -y conda-build") ! shellLoger
+    Seq("bash", "-c", "ln -s spark/python/pyspark miniconda/pkgs/pyspark") ! shellLoger
+  }
+
+  private def loadPythonDependencies(deps: PythonDependencies, notifier: Notifier): Unit = {
+    notifier.info("loading anaconda evn")
+    installAnacondaOnNode()
+    val codegenPackage = PythonPackage("codegen", channel = Option("auto"))
+    installAnacondaPackage(codegenPackage)
+    try {
+      // notifier.info("loadPythonDependencies #5")
+      deps.packages.foreach(pack => {
+        pack.index.getOrElse("anaconda").toLowerCase match {
+          case "anaconda" => installAnacondaPackage(pack)
+          // case "pypi" => installPyPiPackage(pack)
+        }
+      })
+    }
+    catch {
+
+      case rte: RuntimeException =>
+        val sw = new StringWriter
+        rte.printStackTrace(new PrintWriter(sw))
+        notifier.error("", s"Failed to activate environment (runtime) - cause: ${rte.getCause}, message: ${rte.getMessage}, Stack: \n${sw.toString}")
+      case e: Exception =>
+        val sw = new StringWriter
+        e.printStackTrace(new PrintWriter(sw))
+        notifier.error("", s"Failed to activate environment (other) - type: ${e.getClass.getName}, cause: ${e.getCause}, message: ${e.getMessage}, Stack: \n${sw.toString}")
+    }
+  }
+
+  override def getGroupIdentifier: String = "spark"
+
+  override def getRunner(id: String): AmaterasuRunner = runners(id)
+
+  private def getDependencies(deps: Dependencies): Seq[String] = {
+
+    // adding a local repo because Aether needs one
+    val repo = new File(System.getProperty("java.io.tmpdir"), "ama-repo")
+
+    val remotes = deps.repos.map(r =>
+      new RemoteRepository(
+        r.id,
+        r.`type`,
+        r.url
+      )).toList.asJava
+
+    val aether = new Aether(remotes, repo)
+
+    deps.artifacts.flatMap(a => {
+      aether.resolve(
+        new DefaultArtifact(a.groupId, a.artifactId, "", "jar", a.version),
+        JavaScopes.RUNTIME
+      ).map(a => a)
+    }).map(x => x.getFile.getAbsolutePath)
+
+  }
+}
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkExecutionQueue.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkExecutionQueue.scala
new file mode 100755 (executable)
index 0000000..ddcf66c
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.framework.spark.runner.pyspark
+
+import java.util
+import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
+
+
+class PySparkExecutionQueue {
+
+  val queue = new LinkedBlockingQueue[(String, String, util.Map[String, String])]()
+
+  def getNext(): (String, String, util.Map[String, String]) = {
+
+    // if the queue is idle for an hour it will return null which
+    // terminates the python execution, need to revisit
+    queue.poll(1, TimeUnit.HOURS)
+
+  }
+
+  def setForExec(line: (String, String, util.Map[String, String])) = {
+
+    queue.put(line)
+
+  }
+
+}
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkResultQueue.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkResultQueue.scala
new file mode 100755 (executable)
index 0000000..16abbe3
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.framework.spark.runner.pyspark
+
+import org.apache.amaterasu.framework.spark.runner.pyspark.ResultType.ResultType
+
+object ResultType extends Enumeration {
+  type ResultType = Value
+  val success = Value("success")
+  val error = Value("error")
+  val completion = Value("completion")
+}
+
+case class PySparkResult(
+  resultType: ResultType,
+  action: String,
+  statement: String,
+  message: String
+)
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkRunner.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkRunner.scala
new file mode 100644 (file)
index 0000000..c015ec5
--- /dev/null
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.framework.spark.runner.pyspark
+
+import java.io.File
+import java.util
+
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.execution.actions.Notifier
+import org.apache.amaterasu.common.execution.dependencies.PythonDependencies
+import org.apache.amaterasu.common.logging.Logging
+import org.apache.amaterasu.common.runtime.Environment
+import org.apache.amaterasu.sdk.AmaterasuRunner
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.SparkSession
+
+import scala.sys.process.{Process, ProcessLogger}
+
+
+
+
+class PySparkRunner extends AmaterasuRunner with Logging {
+
+  var proc: Process = _
+  var notifier: Notifier = _
+
+  override def getIdentifier: String = "pyspark"
+
+  override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = {
+    interpretSources(actionSource, actionName, exports)
+  }
+
+  def interpretSources(source: String, actionName: String, exports: util.Map[String, String]): Unit = {
+
+    PySparkEntryPoint.getExecutionQueue.setForExec((source, actionName, exports))
+    val resQueue = PySparkEntryPoint.getResultQueue(actionName)
+
+    notifier.info(s"================= started action $actionName =================")
+
+    var res: PySparkResult = null
+
+    do {
+      res = resQueue.getNext()
+      res.resultType match {
+        case ResultType.success =>
+          notifier.success(res.statement)
+        case ResultType.error =>
+          notifier.error(res.statement, res.message)
+          throw new Exception(res.message)
+        case ResultType.completion =>
+          notifier.info(s"================= finished action $actionName =================")
+      }
+    } while (res != null && res.resultType != ResultType.completion)
+  }
+
+}
+
+object PySparkRunner {
+
+  def collectCondaPackages(): String = {
+    val pkgsDirs = new File("./miniconda/pkgs")
+    (pkgsDirs.listFiles.filter {
+      file => file.getName.endsWith(".tar.bz2")
+    }.map {
+      file => s"./miniconda/pkgs/${file.getName}"
+    }.toBuffer ++ "dist/codegen.py").mkString(",")
+  }
+
+  def apply(env: Environment,
+            jobId: String,
+            notifier: Notifier,
+            spark: SparkSession,
+            pypath: String,
+            pyDeps: PythonDependencies,
+            config: ClusterConfig): PySparkRunner = {
+
+    val shellLoger = ProcessLogger(
+      (o: String) => println(o),
+      (e: String) => println(e)
+    )
+
+    //TODO: can we make this less ugly?
+
+
+    val result = new PySparkRunner
+
+    PySparkEntryPoint.start(spark, jobId, env, SparkEnv.get)
+    val port = PySparkEntryPoint.getPort
+    var intpPath = ""
+    if (env.configuration.contains("cwd")) {
+      val cwd = new File(env.configuration("cwd"))
+      intpPath = s"${cwd.getAbsolutePath}/spark_intp.py" // This is to support test environment
+    } else {
+      intpPath = s"spark_intp.py"
+    }
+    var pysparkPath = ""
+    var condaPkgs = ""
+    if (pyDeps != null)
+      condaPkgs = collectCondaPackages()
+    var sparkCmd: Seq[String] = Seq()
+    config.mode match {
+      case "yarn" =>
+        pysparkPath = s"spark/bin/spark-submit"
+        sparkCmd = Seq(pysparkPath, "--py-files", condaPkgs, "--master", "yarn", intpPath, port.toString)
+        val proc = Process(sparkCmd, None,
+          "PYTHONPATH" -> pypath,
+          "PYTHONHASHSEED" -> 0.toString)
+
+        proc.run(shellLoger)
+      case "mesos" =>
+        pysparkPath = config.pysparkPath
+        if (pysparkPath.endsWith("spark-submit")) {
+          sparkCmd = Seq(pysparkPath, "--py-files", condaPkgs, intpPath, port.toString)
+        }
+        else {
+          sparkCmd = Seq(pysparkPath, intpPath, port.toString)
+    }
+        var pysparkPython = "/usr/bin/python"
+
+        if (pyDeps != null &&
+          pyDeps.packages.nonEmpty) {
+          pysparkPython = "./miniconda/bin/python"
+        }
+        val proc = Process(sparkCmd, None,
+      "PYTHONPATH" -> pypath,
+      "PYSPARK_PYTHON" -> pysparkPython,
+      "PYTHONHASHSEED" -> 0.toString)
+
+        proc.run(shellLoger)
+    }
+
+    result.notifier = notifier
+
+    result
+  }
+
+}
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/pyspark/ResultQueue.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/pyspark/ResultQueue.scala
new file mode 100755 (executable)
index 0000000..d0cb4ae
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.framework.spark.runner.pyspark
+
+import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
+
+
+class ResultQueue {
+  val queue = new LinkedBlockingQueue[PySparkResult]()
+
+  def getNext(): PySparkResult = {
+
+    // if the queue is idle for an hour it will return null which
+    // terminates the python execution, need to revisit
+    queue.poll(10, TimeUnit.MINUTES)
+
+  }
+
+  def put(
+    resultType: String,
+    action: String,
+    statement: String,
+    message: String
+  ) = {
+
+    val result = new PySparkResult(ResultType.withName(resultType), action, statement, message)
+    queue.put(result)
+  }
+}
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/repl/AmaSparkILoop.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/repl/AmaSparkILoop.scala
new file mode 100755 (executable)
index 0000000..ec874b6
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.framework.spark.runner.repl
+
+import java.io.PrintWriter
+
+import org.apache.spark.repl.SparkILoop
+
+import scala.tools.nsc.Settings
+import scala.tools.nsc.interpreter.IMain
+
+class AmaSparkILoop(writer: PrintWriter) extends SparkILoop(None, writer) {
+
+  def create = {
+    this.createInterpreter
+  }
+
+  def setSettings(settings: Settings) = {
+    this.settings = settings
+  }
+
+  def getIntp: IMain = {
+    this.intp
+  }
+
+}
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/repl/SparkRunnerHelper.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/repl/SparkRunnerHelper.scala
new file mode 100644 (file)
index 0000000..18658ec
--- /dev/null
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.framework.spark.runner.repl
+
+import java.io.{ByteArrayOutputStream, File, PrintWriter}
+import java.nio.file.{Files, Paths}
+
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.execution.actions.Notifier
+import org.apache.amaterasu.common.logging.Logging
+import org.apache.amaterasu.common.runtime.Environment
+import org.apache.amaterasu.common.utils.FileUtils
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+
+import scala.tools.nsc.GenericRunnerSettings
+import scala.tools.nsc.interpreter.IMain
+
+object SparkRunnerHelper extends Logging {
+
+  private val conf = new SparkConf()
+  private val rootDir = conf.get("spark.repl.classdir", System.getProperty("java.io.tmpdir"))
+  private val outputDir = Files.createTempDirectory(Paths.get(rootDir), "repl").toFile
+  outputDir.deleteOnExit()
+
+  private var sparkSession: SparkSession = _
+
+  var notifier: Notifier = _
+
+  private var interpreter: IMain = _
+
+  def getNode: String = sys.env.get("AMA_NODE") match {
+    case None => "127.0.0.1"
+    case _ => sys.env("AMA_NODE")
+  }
+
+  def getOrCreateScalaInterperter(outStream: ByteArrayOutputStream, jars: Seq[String], recreate: Boolean = false): IMain = {
+    if (interpreter == null || recreate) {
+      initInterpreter(outStream, jars)
+    }
+    interpreter
+  }
+
+  private def scalaOptionError(msg: String): Unit = {
+    notifier.error("", msg)
+  }
+
+  private def initInterpreter(outStream: ByteArrayOutputStream, jars: Seq[String]): Unit = {
+
+    var result: IMain = null
+    val config = new ClusterConfig()
+    try {
+
+      val interpArguments = List(
+        "-Yrepl-class-based",
+        "-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
+        "-classpath", jars.mkString(File.separator)
+      )
+
+      val settings = new GenericRunnerSettings(scalaOptionError)
+      settings.processArguments(interpArguments, processAll = true)
+
+      settings.classpath.append(System.getProperty("java.class.path") + java.io.File.pathSeparator +
+        "spark-" + config.Webserver.sparkVersion + "/jars/*" + java.io.File.pathSeparator +
+        jars.mkString(java.io.File.pathSeparator))
+
+      settings.usejavacp.value = true
+
+      val out = new PrintWriter(outStream)
+      val interpreter = new AmaSparkILoop(out)
+      interpreter.setSettings(settings)
+
+      interpreter.create
+
+      val intp = interpreter.getIntp
+
+      settings.embeddedDefaults(Thread.currentThread().getContextClassLoader)
+      intp.setContextClassLoader
+      intp.initializeSynchronous
+
+      result = intp
+    }
+    catch {
+      case e: Exception =>
+        println(new Predef.String(outStream.toByteArray))
+
+    }
+
+    interpreter = result
+  }
+
+
+  def createSpark(env: Environment,
+                  sparkAppName: String,
+                  jars: Seq[String],
+                  sparkConf: Option[Map[String, Any]],
+                  executorEnv: Option[Map[String, Any]],
+                  config: ClusterConfig,
+                  hostName: String): SparkSession = {
+
+    Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
+    val minicondaPkgsPath = "miniconda/pkgs"
+    val executorMinicondaDirRef = new File(minicondaPkgsPath)
+    val minicondaFiles = if (executorMinicondaDirRef.exists) FileUtils.getAllFiles(executorMinicondaDirRef) else new Array[File](0)
+    val pyfiles = minicondaFiles.filter(f => f.getName.endsWith(".py") ||
+      f.getName.endsWith(".egg") ||
+      f.getName.endsWith(".zip"))
+
+    conf.setAppName(sparkAppName)
+      .set("spark.driver.host", hostName)
+      .set("spark.submit.deployMode", "client")
+      .set("spark.hadoop.validateOutputSpecs", "false")
+      .set("spark.logConf", "true")
+      .set("spark.submit.pyFiles", pyfiles.mkString(","))
+
+
+    val master: String = if (env.master.isEmpty) {
+      "yarn"
+    } else {
+      env.master
+    }
+
+    config.mode match {
+
+      case "mesos" =>
+        conf.set("spark.executor.uri", s"http://$getNode:${config.Webserver.Port}/spark-2.2.1-bin-hadoop2.7.tgz")
+          .setJars(jars)
+          .set("spark.master", env.master)
+          .set("spark.home", s"${scala.reflect.io.File(".").toCanonical.toString}/spark-2.2.1-bin-hadoop2.7")
+
+      case "yarn" =>
+        conf.set("spark.home", config.spark.home)
+          // TODO: parameterize those
+          .setJars(Seq("executor.jar", "spark-runner.jar", "spark-runtime.jar") ++ jars)
+          .set("spark.history.kerberos.keytab", "/etc/security/keytabs/spark.headless.keytab")
+          .set("spark.driver.extraLibraryPath", "/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64")
+          .set("spark.yarn.queue", "default")
+          .set("spark.history.kerberos.principal", "none")
+
+          .set("spark.master", master)
+          .set("spark.executor.instances", config.spark.opts.getOrElse("executor.instances", "1"))
+          .set("spark.yarn.jars", s"spark/jars/*")
+          .set("spark.executor.memory", config.spark.opts.getOrElse("executor.memory", "1g"))
+          .set("spark.dynamicAllocation.enabled", "false")
+          .set("spark.eventLog.enabled", "false")
+          .set("spark.history.fs.logDirectory", "hdfs:///spark2-history/")
+          .set("hadoop.home.dir", config.YARN.hadoopHomeDir)
+
+      case _ => throw new Exception(s"mode ${config.mode} is not legal.")
+    }
+
+    if (config.spark.opts != null && config.spark.opts.nonEmpty) {
+      config.spark.opts.foreach(kv => {
+        log.info(s"Setting ${kv._1} to ${kv._2} as specified in amaterasu.properties")
+        conf.set(kv._1, kv._2)
+      })
+    }
+
+    // adding the the configurations from spark.yml
+    sparkConf match {
+      case Some(cnf) => {
+        for (c <- cnf) {
+          if (c._2.isInstanceOf[String])
+            conf.set(c._1, c._2.toString)
+        }
+      }
+      case None =>
+    }
+
+    // setting the executor env from spark_exec.yml
+    executorEnv match {
+      case Some(env) => {
+        for (c <- env) {
+          if (c._2.isInstanceOf[String])
+            conf.setExecutorEnv(c._1, c._2.toString)
+        }
+      }
+      case None =>
+    }
+
+    conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
+
+    sparkSession = SparkSession.builder
+      .appName(sparkAppName)
+      .master(env.master)
+
+      //.enableHiveSupport()
+      .config(conf).getOrCreate()
+
+    sparkSession.conf.getAll.foreach(x => log.info(x.toString))
+
+    val hc = sparkSession.sparkContext.hadoopConfiguration
+
+    sys.env.get("AWS_ACCESS_KEY_ID") match {
+      case None =>
+      case _ =>
+        hc.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
+        hc.set("fs.s3n.awsAccessKeyId", sys.env("AWS_ACCESS_KEY_ID"))
+        hc.set("fs.s3n.awsSecretAccessKey", sys.env("AWS_SECRET_ACCESS_KEY"))
+    }
+
+    sparkSession
+  }
+}
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/repl/SparkScalaRunner.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/repl/SparkScalaRunner.scala
new file mode 100755 (executable)
index 0000000..46d3077
--- /dev/null
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.framework.spark.runner.repl
+
+import java.io.ByteArrayOutputStream
+import java.util
+
+import org.apache.amaterasu.common.execution.actions.Notifier
+import org.apache.amaterasu.common.logging.Logging
+import org.apache.amaterasu.common.runtime.Environment
+import org.apache.amaterasu.framework.spark.runtime.AmaContext
+import org.apache.amaterasu.sdk.AmaterasuRunner
+import org.apache.spark.sql.{Dataset, SparkSession}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.io.Source
+import scala.tools.nsc.interpreter.{IMain, Results}
+
+class ResHolder(var value: Any)
+
+class SparkScalaRunner(var env: Environment,
+                       var jobId: String,
+                       var interpreter: IMain,
+                       var outStream: ByteArrayOutputStream,
+                       var spark: SparkSession,
+                       var notifier: Notifier,
+                       var jars: Seq[String]) extends Logging with AmaterasuRunner {
+
+  private def scalaOptionError(msg: String): Unit = {
+    notifier.error("", msg)
+  }
+
+  override def getIdentifier = "scala"
+
+  val holder = new ResHolder(null)
+
+  override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = {
+    val source = Source.fromString(actionSource)
+    interpretSources(source, actionName, exports.asScala.toMap)
+  }
+
+  def interpretSources(source: Source, actionName: String, exports: Map[String, String]): Unit = {
+
+    notifier.info(s"================= started action $actionName =================")
+    //notifier.info(s"exports is: $exports")
+
+    for (line <- source.getLines()) {
+
+      // ignoring empty or commented lines
+      if (!line.isEmpty && !line.trim.startsWith("*") && !line.startsWith("/")) {
+
+        outStream.reset()
+        log.debug(line)
+
+        if (line.startsWith("import")) {
+          interpreter.interpret(line)
+        }
+        else {
+
+          val intresult = interpreter.interpret(line)
+
+          val result = interpreter.prevRequestList.last.lineRep.call("$result")
+
+          // intresult: Success, Error, etc
+          // result: the actual result (RDD, df, etc.) for caching
+          // outStream.toString gives you the error message
+          intresult match {
+            case Results.Success =>
+              log.debug("Results.Success")
+
+              notifier.success(line)
+
+              val resultName = interpreter.prevRequestList.last.termNames.last
+
+              //notifier.info(s" result name ${resultName.toString}")
+              //notifier.info(s" exist in exports: ${exports.contains(resultName.toString)}")
+
+              if (exports.contains(resultName.toString)) {
+
+                val format = exports(resultName.toString)
+
+                if (result != null) {
+
+                  result match {
+                    case ds: Dataset[_] =>
+                      log.debug(s"persisting DataFrame: $resultName")
+                      val writeLine = s"""$resultName.write.mode(SaveMode.Overwrite).format("$format").save("${env.workingDir}/$jobId/$actionName/$resultName")"""
+                      val writeResult = interpreter.interpret(writeLine)
+                      if (writeResult != Results.Success) {
+                        val err = outStream.toString
+                        notifier.error(writeLine, err)
+                        log.error(s"error persisting dataset: $writeLine failed with: $err")
+                        //throw new Exception(err)
+                      }
+                      log.debug(s"persisted DataFrame: $resultName")
+
+                    case _ => notifier.info(s"""+++> result type ${result.getClass}""")
+                  }
+                }
+              }
+
+            case Results.Error =>
+              log.debug("Results.Error")
+              val err = outStream.toString
+              notifier.error(line, err)
+              throw new Exception(err)
+
+            case Results.Incomplete =>
+              log.debug("Results.Incomplete")
+
+          }
+        }
+      }
+    }
+
+    notifier.info(s"================= finished action $actionName =================")
+  }
+
+  def initializeAmaContext(env: Environment): Unit = {
+
+    // setting up some context :)
+    val sc = this.spark.sparkContext
+    val sqlContext = this.spark.sqlContext
+
+    interpreter.interpret("import scala.util.control.Exception._")
+    interpreter.interpret("import org.apache.spark.{ SparkContext, SparkConf }")
+    interpreter.interpret("import org.apache.spark.sql.SQLContext")
+    interpreter.interpret("import org.apache.spark.sql.{ Dataset, SparkSession }")
+    interpreter.interpret("import org.apache.spark.sql.SaveMode")
+    interpreter.interpret("import org.apache.amaterasu.framework.spark.runtime.AmaContext")
+    interpreter.interpret("import org.apache.amaterasu.common.runtime.Environment")
+
+    // creating a map (_contextStore) to hold the different spark contexts
+    // in th REPL and getting a reference to it
+    interpreter.interpret("var _contextStore = scala.collection.mutable.Map[String, AnyRef]()")
+    val contextStore = interpreter.prevRequestList.last.lineRep.call("$result").asInstanceOf[mutable.Map[String, AnyRef]]
+    AmaContext.init(spark, jobId, env)
+
+    // populating the contextStore
+    contextStore.put("sc", sc)
+    contextStore.put("sqlContext", sqlContext)
+    contextStore.put("env", env)
+    contextStore.put("spark", spark)
+    contextStore.put("ac", AmaContext)
+
+    interpreter.interpret("val sc = _contextStore(\"sc\").asInstanceOf[SparkContext]")
+    interpreter.interpret("val sqlContext = _contextStore(\"sqlContext\").asInstanceOf[SQLContext]")
+    interpreter.interpret("val env = _contextStore(\"env\").asInstanceOf[Environment]")
+    interpreter.interpret("val spark = _contextStore(\"spark\").asInstanceOf[SparkSession]")
+    interpreter.interpret("val AmaContext = _contextStore(\"ac\").asInstanceOf[AmaContext]")
+    interpreter.interpret("import sqlContext.implicits._")
+
+    // initializing the AmaContext
+    println(s"""AmaContext.init(sc, sqlContext ,"$jobId")""")
+
+  }
+
+}
+
+object SparkScalaRunner extends Logging {
+
+  def apply(env: Environment,
+            jobId: String,
+            spark: SparkSession,
+            outStream: ByteArrayOutputStream,
+            notifier: Notifier,
+            jars: Seq[String]): SparkScalaRunner = {
+
+    new SparkScalaRunner(env, jobId, SparkRunnerHelper.getOrCreateScalaInterperter(outStream, jars), outStream, spark, notifier, jars)
+
+  }
+
+}
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/sparkr/SparkRRunner.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/sparkr/SparkRRunner.scala
new file mode 100644 (file)
index 0000000..390b06a
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.framework.spark.runner.sparkr
+
+import java.io.ByteArrayOutputStream
+import java.util
+
+import org.apache.amaterasu.common.execution.actions.Notifier
+import org.apache.amaterasu.common.logging.Logging
+import org.apache.amaterasu.common.runtime.Environment
+import org.apache.amaterasu.sdk.AmaterasuRunner
+import org.apache.spark.SparkContext
+
+
+class SparkRRunner extends Logging with AmaterasuRunner {
+
+    override def getIdentifier = "spark-r"
+
+    override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = {
+    }
+}
+
+object SparkRRunner {
+    def apply(
+               env: Environment,
+               jobId: String,
+               sparkContext: SparkContext,
+               outStream: ByteArrayOutputStream,
+               notifier: Notifier,
+               jars: Seq[String]
+             ): SparkRRunner = {
+        new SparkRRunner()
+    }
+}
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/sparksql/SparkSqlRunner.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/framework/spark/runner/sparksql/SparkSqlRunner.scala
new file mode 100644 (file)
index 0000000..62af197
--- /dev/null
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.framework.spark.runner.sparksql
+
+import java.io.File
+import java.util
+
+import org.apache.amaterasu.common.execution.actions.Notifier
+import org.apache.amaterasu.common.logging.Logging
+import org.apache.amaterasu.common.runtime.Environment
+import org.apache.amaterasu.framework.spark.runtime.AmaContext
+import org.apache.amaterasu.sdk.AmaterasuRunner
+import org.apache.commons.io.FilenameUtils
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+
+import scala.collection.JavaConverters._
+
+/**
+  * Amaterasu currently supports JSON and PARQUET as data sources.
+  * CSV data source support will be provided in later versions.
+  */
+class SparkSqlRunner extends Logging with AmaterasuRunner {
+  var env: Environment = _
+  var notifier: Notifier = _
+  var jobId: String = _
+  //var actionName: String = _
+  var spark: SparkSession = _
+
+  /*
+  Method: executeQuery
+  Description: when user specifies query in amaterasu format, this method parse and executes the query.
+               If not in Amaterasu format, then directly executes the query
+  @Params: query string
+   */
+  override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = {
+
+    notifier.info(s"================= started action $actionName =================")
+
+    if (!actionSource.isEmpty) {
+
+      var result: DataFrame = null
+      if (actionSource.toLowerCase.contains("amacontext")) {
+
+        //Parse the incoming query
+        //notifier.info(s"================= parsing the SQL query =================")
+
+        val parser: List[String] = actionSource.toLowerCase.split(" ").toList
+        var sqlPart1: String = ""
+        var sqlPart2: String = ""
+        var queryTempLen: Int = 0
+
+        //get only the sql part of the query
+        for (i <- 0 to parser.indexOf("from")) {
+          sqlPart1 += parser(i) + " "
+        }
+
+        if (parser.indexOf("readas") == -1) {
+          queryTempLen = parser.length - 1
+        }
+        else
+          queryTempLen = parser.length - 3
+
+        for (i <- parser.indexOf("from") + 1 to queryTempLen) {
+          if (!parser(i).contains("amacontext"))
+            sqlPart2 += " " + parser(i)
+        }
+
+        //If no read format is speicified by the user, use PARQUET as default file format to load data
+        var fileFormat: String = null
+        //if there is no index for "readas" keyword, then set PARQUET as default read format
+        if (parser.indexOf("readas") == -1) {
+          fileFormat = "parquet"
+        }
+        else
+          fileFormat = parser(parser.indexOf("readas") + 1)
+
+
+        val locationPath: String = parser.filter(word => word.contains("amacontext")).mkString("")
+        val directories = locationPath.split("_")
+        val actionName = directories(1)
+        val dfName = directories(2)
+        val parsedQuery = sqlPart1 + locationPath + sqlPart2
+
+        //Load the dataframe from previous action
+        val loadData: DataFrame = AmaContext.getDataFrame(actionName, dfName, fileFormat)
+        loadData.createOrReplaceTempView(locationPath)
+
+
+        try{
+
+          result = spark.sql(parsedQuery)
+          notifier.success(parsedQuery)
+        } catch {
+          case e: Exception => notifier.error(parsedQuery, e.getMessage)
+        }
+
+      }
+      else {
+
+        notifier.info("Executing SparkSql on: " + actionSource)
+
+        result = spark.sql(actionSource)
+      }
+      val exportsBuff = exports.asScala.toBuffer
+      if (exportsBuff.nonEmpty) {
+        val exportName = exportsBuff.head._1
+        val exportFormat = exportsBuff.head._2
+        //notifier.info(s"exporting to -> ${env.workingDir}/$jobId/$actionName/$exportName")
+        result.write.mode(SaveMode.Overwrite).format(exportFormat).save(s"${env.workingDir}/$jobId/$actionName/$exportName")
+      }
+      notifier.info(s"================= finished action $actionName =================")
+    }
+  }
+
+  /*
+  Method to find the file type of files within a directory
+  @Params
+  folderName : Path to location of the directory containing data-source files
+  */
+
+  def findFileType(folderName: File): Array[String] = {
+    // get all the files from a directory
+    val files: Array[File] = folderName.listFiles()
+    val extensions: Array[String] = files.map(file => FilenameUtils.getExtension(file.toString))
+    extensions
+  }
+
+  override def getIdentifier: String = "sql"
+
+}
+
+object SparkSqlRunner {
+
+  def apply(env: Environment,
+            jobId: String,
+            // actionName: String,
+            notifier: Notifier,
+            spark: SparkSession): SparkSqlRunner = {
+
+    val sparkSqlRunnerObj = new SparkSqlRunner
+
+    sparkSqlRunnerObj.env = env
+    sparkSqlRunnerObj.jobId = jobId
+    //sparkSqlRunnerObj.actionName = actionName
+    sparkSqlRunnerObj.notifier = notifier
+    sparkSqlRunnerObj.spark = spark
+    sparkSqlRunnerObj
+  }
+}
diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv b/frameworks/spark/runner/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv
new file mode 100644 (file)
index 0000000..5f0ce0e
--- /dev/null
@@ -0,0 +1,4 @@
+name,age
+sampath,22
+kirupa,30
+dev,19
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/json/SparkSqlTestData.json b/frameworks/spark/runner/src/test/resources/SparkSql/json/SparkSqlTestData.json
new file mode 100644 (file)
index 0000000..d297f1f
--- /dev/null
@@ -0,0 +1,3 @@
+{"name":"Sampath","age":22}
+{"name":"Kirupa", "age":30}
+{"name":"Dev", "age":19}]
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc
new file mode 100644 (file)
index 0000000..7d66b64
Binary files /dev/null and b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc differ
diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc
new file mode 100644 (file)
index 0000000..74b1890
Binary files /dev/null and b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/.part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc differ
diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_SUCCESS b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_SUCCESS
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_common_metadata b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_common_metadata
new file mode 100644 (file)
index 0000000..5d83fd6
Binary files /dev/null and b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_common_metadata differ
diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_metadata b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_metadata
new file mode 100644 (file)
index 0000000..ac54053
Binary files /dev/null and b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/_metadata differ
diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
new file mode 100644 (file)
index 0000000..e1b0d2e
Binary files /dev/null and b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet differ
diff --git a/frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
new file mode 100644 (file)
index 0000000..d807ba9
Binary files /dev/null and b/frameworks/spark/runner/src/test/resources/SparkSql/parquet/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet differ
diff --git a/frameworks/spark/runner/src/test/resources/amaterasu.properties b/frameworks/spark/runner/src/test/resources/amaterasu.properties
new file mode 100755 (executable)
index 0000000..d402fed
--- /dev/null
@@ -0,0 +1,9 @@
+zk=127.0.0.1
+version=0.2.0-incubating
+master=192.168.33.11
+user=root
+mode=mesos
+webserver.port=8000
+webserver.root=dist
+spark.version=2.1.1-bin-hadoop2.7
+pysparkPath = /usr/bin/python
diff --git a/frameworks/spark/runner/src/test/resources/py4j-0.10.4-src.zip b/frameworks/spark/runner/src/test/resources/py4j-0.10.4-src.zip
new file mode 100644 (file)
index 0000000..8c3829e
Binary files /dev/null and b/frameworks/spark/runner/src/test/resources/py4j-0.10.4-src.zip differ
diff --git a/frameworks/spark/runner/src/test/resources/py4j.tar.gz b/frameworks/spark/runner/src/test/resources/py4j.tar.gz
new file mode 100644 (file)
index 0000000..761a0af
Binary files /dev/null and b/frameworks/spark/runner/src/test/resources/py4j.tar.gz differ
diff --git a/frameworks/spark/runner/src/test/resources/pyspark-with-amacontext.py b/frameworks/spark/runner/src/test/resources/pyspark-with-amacontext.py
new file mode 100755 (executable)
index 0000000..c940eea
--- /dev/null
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+class AmaContext(object):
+
+    def __init__(self, sc, spark, job_id, env):
+        self.sc = sc
+        self.spark = spark
+        self.job_id = job_id
+        self.env = env
+
+    def get_dataframe(self, action_name, dataset_name, format = "parquet"):
+        return self.spark.read.format(format).load(str(self.env.working_dir) + "/" + self.job_id + "/" + action_name + "/" + dataset_name)
+
+class Environment(object):
+
+    def __init__(self, name, master, input_root_path, output_root_path, working_dir, configuration):
+        self.name = name
+        self.master = master
+        self.input_root_path = input_root_path
+        self.output_root_path = output_root_path
+        self.working_dir = working_dir
+        self.configuration = configuration
+
+data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
+rdd = ama_context.sc.parallelize(data)
+odd = rdd.filter(lambda num: num % 2 != 0)
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/test/resources/pyspark.tar.gz b/frameworks/spark/runner/src/test/resources/pyspark.tar.gz
new file mode 100644 (file)
index 0000000..6f25984
Binary files /dev/null and b/frameworks/spark/runner/src/test/resources/pyspark.tar.gz differ
diff --git a/frameworks/spark/runner/src/test/resources/pyspark.zip b/frameworks/spark/runner/src/test/resources/pyspark.zip
new file mode 100644 (file)
index 0000000..a624c9f
Binary files /dev/null and b/frameworks/spark/runner/src/test/resources/pyspark.zip differ
diff --git a/frameworks/spark/runner/src/test/resources/simple-pyspark.py b/frameworks/spark/runner/src/test/resources/simple-pyspark.py
new file mode 100755 (executable)
index 0000000..923f81c
--- /dev/null
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
+try:
+    rdd = sc.parallelize(data)
+
+    def g(x):
+        print(x)
+
+    rdd.foreach(g)
+except Exception as e:
+    print type(e), e
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/test/resources/simple-python-err.py b/frameworks/spark/runner/src/test/resources/simple-python-err.py
new file mode 100755 (executable)
index 0000000..dff1491
--- /dev/null
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+data = [1, 2, 3, 4, 5]
+1/0
+
+with open('/tmp/amatest-in.txt', 'a') as the_file:
+    the_file.write('hi there\n') # python will convert \n to os.linesep
diff --git a/frameworks/spark/runner/src/test/resources/simple-python.py b/frameworks/spark/runner/src/test/resources/simple-python.py
new file mode 100755 (executable)
index 0000000..0ac6f85
--- /dev/null
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+data = [1, 2, 3, 4, 5]
+print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
+print(data)
+
+with open('/tmp/amatest-in.txt', 'a') as the_file:
+    the_file.write('hi there\n') # python will convert \n to os.linesep
diff --git a/frameworks/spark/runner/src/test/resources/simple-spark.scala b/frameworks/spark/runner/src/test/resources/simple-spark.scala
new file mode 100755 (executable)
index 0000000..f2e49fd
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.spark.sql.{DataFrame, SaveMode}
+
+val data = Seq(1,3,4,5,6)
+
+
+val sc = AmaContext.sc
+val rdd = sc.parallelize(data)
+val sqlContext = AmaContext.spark
+val x: DataFrame = rdd.toDF()
+
+x.write.mode(SaveMode.Overwrite)
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/test/resources/step-2.scala b/frameworks/spark/runner/src/test/resources/step-2.scala
new file mode 100755 (executable)
index 0000000..86fd048
--- /dev/null
@@ -0,0 +1,22 @@
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+val highNoDf = AmaContext.getDataFrame("start", "x").where("age > 20")
+highNoDf.show
diff --git a/frameworks/spark/runner/src/test/resources/tmp/job/start/x/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/frameworks/spark/runner/src/test/resources/tmp/job/start/x/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
new file mode 100644 (file)
index 0000000..e1b0d2e
Binary files /dev/null and b/frameworks/spark/runner/src/test/resources/tmp/job/start/x/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet differ
diff --git a/frameworks/spark/runner/src/test/resources/tmp/job/start/x/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/frameworks/spark/runner/src/test/resources/tmp/job/start/x/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
new file mode 100644 (file)
index 0000000..d807ba9
Binary files /dev/null and b/frameworks/spark/runner/src/test/resources/tmp/job/start/x/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet differ
diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/SparkTestsSuite.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/SparkTestsSuite.scala
new file mode 100644 (file)
index 0000000..0214568
--- /dev/null
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.framework.spark
+
+import java.io.{ByteArrayOutputStream, File}
+
+import org.apache.amaterasu.common.dataobjects.ExecData
+import org.apache.amaterasu.common.execution.dependencies._
+import org.apache.amaterasu.common.runtime.Environment
+import org.apache.amaterasu.executor.common.executors.ProvidersFactory
+import org.apache.amaterasu.framework.spark.runner.RunnersLoadingTests
+import org.apache.amaterasu.framework.spark.runner.pyspark.PySparkRunnerTests
+import org.apache.amaterasu.framework.spark.runner.repl.{SparkScalaRunner, SparkScalaRunnerTests}
+import org.apache.amaterasu.framework.spark.runner.sparksql.SparkSqlRunnerTests
+import org.apache.amaterasu.utilities.TestNotifier
+import org.apache.spark.sql.SparkSession
+import org.scalatest._
+
+import scala.collection.mutable.ListBuffer
+
+
+class SparkTestsSuite extends Suites(
+  new PySparkRunnerTests,
+  new RunnersLoadingTests,
+  new SparkSqlRunnerTests,
+  new SparkScalaRunnerTests
+) with BeforeAndAfterAll {
+
+  var env: Environment = _
+  var factory: ProvidersFactory = _
+  var spark: SparkSession = _
+
+  private def createTestMiniconda(): Unit = {
+    println(s"PATH: ${new File(".").getAbsolutePath}")
+    new File("miniconda/pkgs").mkdirs()
+  }
+
+  override def beforeAll(): Unit = {
+
+    // I can't apologise enough for this
+    val resources = new File(getClass.getResource("/spark_intp.py").getPath).getParent
+    val workDir = new File(resources).getParentFile.getParent
+
+    env = Environment()
+    env.workingDir = s"file://$workDir"
+
+    env.master = "local[1]"
+    if (env.configuration != null) env.configuration ++ "pysparkPath" -> "/usr/bin/python" else env.configuration = Map(
+      "pysparkPath" -> "/usr/bin/python",
+      "cwd" -> resources
+    )
+
+    val excEnv = Map[String, Any](
+      "PYTHONPATH" -> resources
+    )
+    createTestMiniconda()
+    env.configuration ++ "spark_exec_env" -> excEnv
+    factory = ProvidersFactory(ExecData(env,
+      Dependencies(ListBuffer.empty[Repo], List.empty[Artifact]),
+      PythonDependencies(List.empty[PythonPackage]),
+      Map(
+        "spark" -> Map.empty[String, Any],
+        "spark_exec_env" -> Map("PYTHONPATH" -> resources))),
+      "test",
+      new ByteArrayOutputStream(),
+      new TestNotifier(),
+      "test",
+      "localhost",
+      getClass.getClassLoader.getResource("amaterasu.properties").getPath)
+    spark = factory.getRunner("spark", "scala").get.asInstanceOf[SparkScalaRunner].spark
+
+    this.nestedSuites.filter(s => s.isInstanceOf[RunnersLoadingTests]).foreach(s => s.asInstanceOf[RunnersLoadingTests].factory = factory)
+    this.nestedSuites.filter(s => s.isInstanceOf[PySparkRunnerTests]).foreach(s => s.asInstanceOf[PySparkRunnerTests].factory = factory)
+    this.nestedSuites.filter(s => s.isInstanceOf[SparkSqlRunnerTests]).foreach(s => s.asInstanceOf[SparkSqlRunnerTests].factory = factory)
+    this.nestedSuites.filter(s => s.isInstanceOf[SparkScalaRunnerTests]).foreach(s => s.asInstanceOf[SparkScalaRunnerTests].factory = factory)
+    this.nestedSuites.filter(s => s.isInstanceOf[SparkSqlRunnerTests]).foreach(s => s.asInstanceOf[SparkSqlRunnerTests].env = env)
+
+    super.beforeAll()
+  }
+
+  override def afterAll(): Unit = {
+    new File("miniconda").delete()
+    spark.stop()
+
+    super.afterAll()
+  }
+
+}
diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/runner/RunnersLoadingTests.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/runner/RunnersLoadingTests.scala
new file mode 100644 (file)
index 0000000..3629674
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.framework.spark.runner
+
+import org.apache.amaterasu.common.runtime.Environment
+import org.apache.amaterasu.executor.common.executors.ProvidersFactory
+import org.scalatest._
+
+@DoNotDiscover
+class RunnersLoadingTests extends FlatSpec with Matchers with BeforeAndAfterAll {
+
+  var env: Environment = _
+  var factory: ProvidersFactory = _
+
+  "RunnersFactory" should "be loaded with all the implementations of AmaterasuRunner in its classpath" in {
+    val r = factory.getRunner("spark", "scala")
+    r should not be null
+  }
+}
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkRunnerTests.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/runner/pyspark/PySparkRunnerTests.scala
new file mode 100755 (executable)
index 0000000..a320e56
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.framework.spark.runner.pyspark
+
+import java.io.File
+
+import org.apache.amaterasu.executor.common.executors.ProvidersFactory
+import org.apache.log4j.{Level, Logger}
+import org.scalatest.{BeforeAndAfterAll, DoNotDiscover, FlatSpec, Matchers}
+
+import scala.collection.JavaConverters._
+import scala.io.Source
+
+@DoNotDiscover
+class PySparkRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll {
+
+  Logger.getLogger("org").setLevel(Level.OFF)
+  Logger.getLogger("akka").setLevel(Level.OFF)
+  Logger.getLogger("spark").setLevel(Level.OFF)
+  Logger.getLogger("jetty").setLevel(Level.OFF)
+  Logger.getRootLogger.setLevel(Level.OFF)
+
+  var factory: ProvidersFactory = _
+
+  def delete(file: File) {
+    if (file.isDirectory)
+      Option(file.listFiles).map(_.toList).getOrElse(Nil).foreach(delete(_))
+    file.delete
+  }
+
+  override protected def afterAll(): Unit = {
+    val pysparkDir = new File(getClass.getResource("/pyspark").getPath)
+    val py4jDir = new File(getClass.getResource("/py4j").getPath)
+    delete(pysparkDir)
+    delete(py4jDir)
+    super.afterAll()
+  }
+
+
+  "PySparkRunner.executeSource" should "execute simple python code" in {
+    val src = Source.fromFile(getClass.getResource("/simple-python.py").getPath).mkString
+    var runner = factory.getRunner("spark", "pyspark").get.asInstanceOf[PySparkRunner]
+    println("3333333333333333333333")
+    runner.executeSource(src, "test_action1", Map.empty[String, String].asJava)
+  }
+
+  it should "print and trows an errors" in {
+    a[java.lang.Exception] should be thrownBy {
+      val src = Source.fromFile(getClass.getResource("/simple-python-err.py").getPath).mkString
+      var runner = factory.getRunner("spark", "pyspark").get.asInstanceOf[PySparkRunner]
+      runner.executeSource(src, "test_action2", Map.empty[String, String].asJava)
+    }
+  }
+
+  it should "also execute spark code written in python" in {
+    val src = Source.fromFile(getClass.getResource("/simple-pyspark.py").getPath).mkString
+    var runner = factory.getRunner("spark", "pyspark").get.asInstanceOf[PySparkRunner]
+    runner.executeSource(src, "test_action3", Map("numDS" -> "parquet").asJava)
+  }
+
+  it should "also execute spark code written in python with AmaContext being used" in {
+    val src = Source.fromFile(getClass.getResource("/pyspark-with-amacontext.py").getPath).mkString
+    var runner = factory.getRunner("spark", "pyspark").get.asInstanceOf[PySparkRunner]
+    runner.executeSource(src, "test_action4", Map.empty[String, String].asJava)
+  }
+
+}
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/runner/repl/SparkScalaRunnerTests.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/runner/repl/SparkScalaRunnerTests.scala
new file mode 100755 (executable)
index 0000000..26f2ceb
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.framework.spark.runner.repl
+
+import org.apache.amaterasu.executor.common.executors.ProvidersFactory
+import org.apache.amaterasu.framework.spark.runtime.AmaContext
+import org.scalatest.{BeforeAndAfterAll, DoNotDiscover, FlatSpec, Matchers}
+
+import scala.collection.JavaConverters._
+import scala.io.Source
+
+@DoNotDiscover
+class SparkScalaRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll {
+  var factory: ProvidersFactory = _
+  var runner: SparkScalaRunner = _
+
+
+  "SparkScalaRunner" should "execute the simple-spark.scala" in {
+
+
+    val sparkRunner =factory.getRunner("spark", "scala").get.asInstanceOf[SparkScalaRunner]
+    val script = getClass.getResource("/simple-spark.scala").getPath
+    val sourceCode = Source.fromFile(script).getLines().mkString("\n")
+    sparkRunner.executeSource(sourceCode, "start", Map.empty[String, String].asJava)
+
+  }
+
+  "SparkScalaRunner" should "execute step-2.scala and access data from simple-spark.scala" in {
+
+    val sparkRunner =factory.getRunner("spark", "scala").get.asInstanceOf[SparkScalaRunner]
+    val script = getClass.getResource("/step-2.scala").getPath
+    sparkRunner.env.workingDir = s"${getClass.getResource("/tmp").getPath}"
+    AmaContext.init(sparkRunner.spark,"job",sparkRunner.env)
+    val sourceCode = Source.fromFile(script).getLines().mkString("\n")
+    sparkRunner.executeSource(sourceCode, "cont", Map.empty[String, String].asJava)
+
+  }
+
+
+}
\ No newline at end of file
diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/runner/sparksql/SparkSqlRunnerTests.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/framework/spark/runner/sparksql/SparkSqlRunnerTests.scala
new file mode 100644 (file)
index 0000000..abb5745
--- /dev/null
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.framework.spark.runner.sparksql
+
+import org.apache.amaterasu.common.runtime.Environment
+import org.apache.amaterasu.executor.common.executors.ProvidersFactory
+import org.apache.amaterasu.utilities.TestNotifier
+import org.apache.log4j.{Level, Logger}
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.scalatest.{BeforeAndAfterAll, DoNotDiscover, FlatSpec, Matchers}
+
+import scala.collection.JavaConverters._
+
+@DoNotDiscover
+class SparkSqlRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll {
+
+  Logger.getLogger("org").setLevel(Level.OFF)
+  Logger.getLogger("akka").setLevel(Level.OFF)
+  Logger.getLogger("spark").setLevel(Level.OFF)
+  Logger.getLogger("jetty").setLevel(Level.OFF)
+  Logger.getRootLogger.setLevel(Level.OFF)
+
+
+  val notifier = new TestNotifier()
+
+  var factory: ProvidersFactory = _
+  var env: Environment = _
+
+  /*
+  Test whether parquet is used as default file format to load data from previous actions
+   */
+
+  "SparkSql" should "load data as parquet if no input foramt is specified" in {
+
+    val sparkSql: SparkSqlRunner = factory.getRunner("spark", "sql").get.asInstanceOf[SparkSqlRunner]
+    val spark: SparkSession = sparkSql.spark
+
+    //Prepare test dataset
+    val inputDf = spark.read.parquet(getClass.getResource("/SparkSql/parquet").getPath)
+
+    inputDf.write.mode(SaveMode.Overwrite).parquet(s"${env.workingDir}/${sparkSql.jobId}/sparksqldefaultparquetjobaction/sparksqldefaultparquetjobactiontempdf")
+    sparkSql.executeSource("select * FROM AMACONTEXT_sparksqldefaultparquetjobaction_sparksqldefaultparquetjobactiontempdf where age=22", "sql_parquet_test", Map("result" -> "parquet").asJava)
+
+    val outputDf = spark.read.parquet(s"${env.workingDir}/${sparkSql.jobId}/sql_parquet_test/result")
+    println("Output Default Parquet: " + inputDf.count + "," + outputDf.first().getString(1))
+    outputDf.first().getString(1) shouldEqual "Michael"
+  }
+
+  /*
+  Test whether the parquet data is successfully parsed, loaded and processed by SparkSQL
+   */
+
+  "SparkSql" should "load PARQUET data directly from previous action's dataframe and persist the Data in working directory" in {
+
+    val sparkSql: SparkSqlRunner = factory.getRunner("spark", "sql").get.asInstanceOf[SparkSqlRunner]
+    val spark: SparkSession = sparkSql.spark
+
+    //Prepare test dataset
+    val inputDf = spark.read.parquet(getClass.getResource("/SparkSql/parquet").getPath)
+    inputDf.write.mode(SaveMode.Overwrite).parquet(s"${env.workingDir}/${sparkSql.jobId}/sparksqlparquetjobaction/sparksqlparquetjobactiontempdf")
+    sparkSql.executeSource("select * FROM AMACONTEXT_sparksqlparquetjobaction_sparksqlparquetjobactiontempdf READAS parquet", "sql_parquet_test", Map("result2" -> "parquet").asJava)
+
+    val outputDf = spark.read.parquet(s"${env.workingDir}/${sparkSql.jobId}/sql_parquet_test/result2")
+    println("Output Parquet: " + inputDf.count + "," + outputDf.count)
+    inputDf.first().getString(1) shouldEqual outputDf.first().getString(1)
+  }
+
+
+  /*
+  Test whether the JSON data is successfully parsed, loaded by SparkSQL
+  */
+
+  "SparkSql" should "load JSON data directly from previous action's dataframe and persist the Data in working directory" in {
+
+    val sparkSql: SparkSqlRunner = factory.getRunner("spark", "sql").get.asInstanceOf[SparkSqlRunner]
+    val spark: SparkSession = sparkSql.spark
+
+    //Prepare test dataset
+    val inputDf = spark.read.json(getClass.getResource("/SparkSql/json").getPath)
+
+    inputDf.write.mode(SaveMode.Overwrite).json(s"${env.workingDir}/${sparkSql.jobId}/sparksqljsonjobaction/sparksqljsonjobactiontempdf")
+    sparkSql.executeSource("select * FROM AMACONTEXT_sparksqljsonjobaction_sparksqljsonjobactiontempdf  where age='30' READAS json", "sql_json_test", Map("result" -> "json").asJava)
+
+    val outputDf = spark.read.json(s"${env.workingDir}/${sparkSql.jobId}/sql_json_test/result")
+    println("Output JSON: " + inputDf.count + "," + outputDf.count)
+    outputDf.first().getString(1) shouldEqual "Kirupa"
+
+  }
+
+  /*
+  Test whether the CSV data is successfully parsed, loaded by SparkSQL
+  */
+
+  "SparkSql" should "load CSV data directly from previous action's dataframe and persist the Data in working directory" in {
+
+    val sparkSql: SparkSqlRunner = factory.getRunner("spark", "sql").get.asInstanceOf[SparkSqlRunner]
+    val spark: SparkSession = sparkSql.spark
+
+    //Prepare test dataset
+    val inputDf = spark.read.csv(getClass.getResource("/SparkSql/csv").getPath)
+    inputDf.write.mode(SaveMode.Overwrite).csv(s"${env.workingDir}/${sparkSql.jobId}/sparksqlcsvjobaction/sparksqlcsvjobactiontempdf")
+    sparkSql.executeSource("select * FROM AMACONTEXT_sparksqlcsvjobaction_sparksqlcsvjobactiontempdf READAS csv", "sql_csv_test", Map("result" -> "csv").asJava)
+
+
+    val outputDf = spark.read.csv(s"${env.workingDir}/${sparkSql.jobId}/sql_csv_test/result")
+    println("Output CSV: " + inputDf.count + "," + outputDf.count)
+    inputDf.first().getString(1) shouldEqual outputDf.first().getString(1)
+  }
+
+  /*
+  Test whether the data can be directly read from a file and executed by sparkSql
+  */
+//  "SparkSql" should "load data directly from a file and persist the Data in working directory" in {
+//
+//    val tempFileEnv = Environment()
+//    tempFileEnv.workingDir = "file:/tmp/"
+//    AmaContext.init(spark, "sparkSqlFileJob", tempFileEnv)
+//
+//    val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlFileJob", notifier, spark)
+//    sparkSql.executeSource("SELECT * FROM parquet.`" + getClass.getResource("/SparkSql/parquet").getPath + "`", "sql_parquet_file_test", Map("result" -> "parquet").asJava)
+//    val outputParquetDf = spark.read.parquet(s"${tempFileEnv.workingDir}/sparkSqlFileJob/sql_parquet_file_test/result")
+//    println("Output Parquet dataframe: " + outputParquetDf.show)
+//    outputParquetDf.first().getString(1) shouldEqual "Michael"
+//    sparkSql.executeSource("SELECT * FROM json.`" + getClass.getResource("/SparkSql/json").getPath + "`","sql_parquet_file_test", Map("result" -> "json").asJava)
+//
+//    val outputJsonDf = spark.read.parquet(s"${tempFileEnv.workingDir}/sparkSqlFileJob/sql_parquet_file_test/result")
+//    println("Output Json dataframe: " + outputJsonDf.show)
+//    outputJsonDf.first().getString(1) shouldEqual "Sampath"
+//
+//  }
+
+
+}
diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala
new file mode 100644 (file)
index 0000000..16cb97b
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.utilities
+
+import org.apache.amaterasu.common.execution.actions.Notifier
+import org.apache.amaterasu.common.logging.Logging
+
+
+class TestNotifier extends Notifier with Logging {
+
+  override def info(msg: String): Unit = {
+    log.info(msg)
+  }
+
+  override def success(line: String): Unit = {
+    log.info(s"successfully executed line: $line")
+  }
+
+  override def error(line: String, msg: String): Unit = {
+    log.error(s"Error executing line: $line message: $msg")
+  }
+}
diff --git a/frameworks/spark/runtime/build.gradle b/frameworks/spark/runtime/build.gradle
new file mode 100644 (file)
index 0000000..9bba2e4
--- /dev/null
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+plugins {
+    id 'com.github.johnrengelman.shadow' version '1.2.4'
+    id 'com.github.maiflai.scalatest' version '0.6-5-g9065d91'
+    id 'scala'
+    id 'java'
+}
+
+shadowJar {
+    zip64 true
+}
+
+repositories {
+    maven {
+        url "https://plugins.gradle.org/m2/"
+    }
+    mavenCentral()
+}
+
+test {
+    maxParallelForks = 1
+    forkEvery = 1
+}
+
+configurations {
+    provided
+    runtime.exclude module: 'hadoop-common'
+    runtime.exclude module: 'hadoop-yarn-api'
+    runtime.exclude module: 'hadoop-yarn-client'
+    runtime.exclude module: 'hadoop-hdfs'
+    runtime.exclude module: 'mesos'
+    runtime.exclude module: 'scala-compiler'
+}
+
+sourceSets {
+    main.compileClasspath += configurations.provided
+    test.compileClasspath += configurations.provided
+    test.runtimeClasspath += configurations.provided
+}
+
+dependencies {
+
+    compile project(':executor')
+    provided('org.apache.spark:spark-repl_2.11:2.2.1')
+    provided('org.apache.spark:spark-core_2.11:2.2.1')
+
+}
+
+sourceSets {
+    test {
+        resources.srcDirs += [file('src/test/resources')]
+    }
+
+    main {
+        scala {
+            srcDirs = ['src/main/scala', 'src/main/java']
+        }
+        java {
+            srcDirs = []
+        }
+    }
+}
+
+test {
+
+    maxParallelForks = 1
+}
+
+task copyToHome(type: Copy) {
+    from 'build/libs'
+    into '../../../build/amaterasu/dist'
+    from 'build/resources/main'
+    into '../../../build/amaterasu/dist'
+}
diff --git a/frameworks/spark/runtime/src/main/scala/org/apache/amaterasu/framework/spark/runtime/AmaContext.scala b/frameworks/spark/runtime/src/main/scala/org/apache/amaterasu/framework/spark/runtime/AmaContext.scala
new file mode 100644 (file)
index 0000000..cb2eccc
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.framework.spark.runtime
+
+import org.apache.amaterasu.common.logging.Logging
+import org.apache.amaterasu.common.runtime.Environment
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, Encoder, SparkSession}
+
+object AmaContext extends Logging {
+
+  var spark: SparkSession = _
+  var sc: SparkContext = _
+  var jobId: String = _
+  var env: Environment = _
+
+  def init(spark: SparkSession,
+           jobId: String,
+           env: Environment): Unit = {
+
+    AmaContext.spark = spark
+    AmaContext.sc = spark.sparkContext
+    AmaContext.jobId = jobId
+    AmaContext.env = env
+
+  }
+
+  def getDataFrame(actionName: String, dfName: String, format: String = "parquet"): DataFrame = {
+    spark.read.format(format).load(s"${env.workingDir}/$jobId/$actionName/$dfName")
+  }
+
+  def getDataset[T: Encoder](actionName: String, dfName: String, format: String = "parquet"): Dataset[T] = {
+    getDataFrame(actionName, dfName, format).as[T]
+  }
+
+}
index 85f3050..9fdd83c 100644 (file)
@@ -1,4 +1,4 @@
-#Sun Jun 24 01:04:07 SGT 2018
+#Thu Jun 28 13:07:02 SGT 2018
 distributionBase=GRADLE_USER_HOME
 distributionPath=wrapper/dists
 zipStoreBase=GRADLE_USER_HOME