AMATERASU-24 Refactor Spark out of Amaterasu executor to it's own project
authorArun Manivannan <arun@arunma.com>
Sat, 23 Jun 2018 16:43:07 +0000 (00:43 +0800)
committerArun Manivannan <arun@arunma.com>
Sat, 23 Jun 2018 16:43:07 +0000 (00:43 +0800)
56 files changed:
build.gradle
executor/build.gradle
executor/src/main/java/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkEntryPoint.java [deleted file]
executor/src/main/resources/codegen.py [deleted file]
executor/src/main/resources/runtime.py [deleted file]
executor/src/main/resources/spark_intp.py [deleted file]
executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkExecutionQueue.scala [deleted file]
executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkResultQueue.scala [deleted file]
executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala [deleted file]
executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/ResultQueue.scala [deleted file]
executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRRunner.scala [deleted file]
executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala [deleted file]
executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala [deleted file]
executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
executor/src/main/scala/org/apache/amaterasu/executor/runtime/AmaContext.scala [deleted file]
executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
executor/src/main/scala/org/apache/spark/repl/amaterasu/AmaSparkILoop.scala [deleted file]
executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala [deleted file]
executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala [deleted file]
executor/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv [deleted file]
executor/src/test/resources/SparkSql/json/SparkSqlTestData.json [deleted file]
executor/src/test/resources/SparkSql/parquet/.part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc [deleted file]
executor/src/test/resources/SparkSql/parquet/.part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc [deleted file]
executor/src/test/resources/SparkSql/parquet/_SUCCESS [deleted file]
executor/src/test/resources/SparkSql/parquet/_common_metadata [deleted file]
executor/src/test/resources/SparkSql/parquet/_metadata [deleted file]
executor/src/test/resources/SparkSql/parquet/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet [deleted file]
executor/src/test/resources/SparkSql/parquet/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet [deleted file]
executor/src/test/resources/amaterasu.properties [deleted file]
executor/src/test/resources/py4j-0.10.4-src.zip [deleted file]
executor/src/test/resources/py4j.tar.gz [deleted file]
executor/src/test/resources/pyspark-with-amacontext.py [deleted file]
executor/src/test/resources/pyspark.tar.gz [deleted file]
executor/src/test/resources/pyspark.zip [deleted file]
executor/src/test/resources/simple-pyspark.py [deleted file]
executor/src/test/resources/simple-python-err.py [deleted file]
executor/src/test/resources/simple-python.py [deleted file]
executor/src/test/resources/simple-spark.scala [deleted file]
executor/src/test/resources/step-2.scala [deleted file]
executor/src/test/resources/tmp/job/start/x/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet [deleted file]
executor/src/test/resources/tmp/job/start/x/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet [deleted file]
executor/src/test/scala/org/apache/amaterasu/RunnersTests/RunnersLoadingTests.scala [deleted file]
executor/src/test/scala/org/apache/amaterasu/spark/PySparkRunnerTests.scala [deleted file]
executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala [deleted file]
executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala [deleted file]
executor/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala [deleted file]
executor/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala [deleted file]
frameworks/spark/runner/src/test/resources/codegen.py [moved from executor/src/test/resources/codegen.py with 100% similarity]
frameworks/spark/runner/src/test/resources/runtime.py [moved from executor/src/test/resources/runtime.py with 100% similarity]
frameworks/spark/runner/src/test/resources/spark-version-info.properties [moved from executor/src/main/resources/spark-version-info.properties with 100% similarity]
frameworks/spark/runner/src/test/resources/spark_intp.py [moved from executor/src/test/resources/spark_intp.py with 100% similarity]
leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
leader/src/main/scripts/ama-start-mesos.sh
leader/src/main/scripts/ama-start-yarn.sh
leader/src/main/scripts/amaterasu.properties
settings.gradle

index 0f11347..00e44ea 100644 (file)
@@ -25,10 +25,6 @@ allprojects {
     version '0.2.0-incubating-rc4'
 }
 
-project(':leader')
-project(':common')
-project(':executor')
-
 task copyLeagalFiles(type: Copy) {
     from "./DISCLAIMER", "./LICENSE", "./NOTICE"
     into "${buildDir}/amaterasu"
index 21bc2b0..09e269c 100644 (file)
@@ -54,7 +54,6 @@ dependencies {
 
     compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.8'
     compile group: 'org.scala-lang', name: 'scala-reflect', version: '2.11.8'
-    compile group: 'org.scala-lang', name: 'scala-compiler', version: '2.11.8'
     compile group: 'io.netty', name: 'netty-all', version: '4.0.42.Final'
     compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.5'
     compile group: 'org.apache.maven', name: 'maven-core', version: '3.0.5'
@@ -75,18 +74,7 @@ dependencies {
     compile project(':common')
     compile project(':amaterasu-sdk')
 
-    //runtime dependency for spark
-    provided('org.apache.spark:spark-repl_2.11:2.2.1')
-    provided('org.apache.spark:spark-core_2.11:2.2.1')
-
-    testCompile project(':common')
-    testCompile "gradle.plugin.com.github.maiflai:gradle-scalatest:0.14"
-    testRuntime 'org.pegdown:pegdown:1.1.0'
-    testCompile 'junit:junit:4.11'
-    testCompile 'org.scalatest:scalatest_2.11:3.0.2'
-    testCompile 'org.scala-lang:scala-library:2.11.8'
-    testCompile('org.apache.spark:spark-repl_2.11:2.2.1')
-    testCompile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.9'
+
 
 }
 
diff --git a/executor/src/main/java/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkEntryPoint.java b/executor/src/main/java/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkEntryPoint.java
deleted file mode 100755 (executable)
index a521fce..0000000
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark;
-
-import org.apache.amaterasu.executor.runtime.AmaContext;
-import org.apache.amaterasu.common.runtime.Environment;
-
-import org.apache.spark.SparkEnv;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
-
-import org.apache.spark.sql.SparkSession;
-import py4j.GatewayServer;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class PySparkEntryPoint {
-
-    //private static Boolean started = false;
-    private static PySparkExecutionQueue queue = new PySparkExecutionQueue();
-    private static ConcurrentHashMap<String, ResultQueue> resultQueues = new ConcurrentHashMap<>();
-
-    private static int port = 0;
-    private static SparkSession sparkSession = null;
-    private static JavaSparkContext jsc = null;
-    private static SQLContext sqlContext = null;
-    private static SparkEnv sparkEnv = null;
-
-    public static PySparkExecutionQueue getExecutionQueue() {
-        return queue;
-    }
-
-    public static ResultQueue getResultQueue(String actionName) {
-        resultQueues.putIfAbsent(actionName, new ResultQueue());
-        return resultQueues.get(actionName);
-    }
-
-    public static JavaSparkContext getJavaSparkContext() {
-        SparkEnv.set(sparkEnv);
-        return jsc;
-    }
-
-    public static String getJobId(){
-        return AmaContext.jobId();
-    }
-
-    public static Environment getEnv(){
-        return AmaContext.env();
-    }
-
-    public static SQLContext getSqlContext() {
-        SparkEnv.set(sparkEnv);
-        return sqlContext;
-    }
-
-    public static SparkSession getSparkSession() {
-        SparkEnv.set(sparkEnv);
-        return sparkSession;
-    }
-
-    public static SparkConf getSparkConf() {
-        return jsc.getConf();
-    }
-
-    private static void generatePort() {
-
-        try {
-
-            ServerSocket socket = new ServerSocket(0);
-            port = socket.getLocalPort();
-
-            socket.close();
-
-        } catch (IOException e) {
-        }
-
-    }
-
-    public static int getPort() {
-        return port;
-    }
-
-    public static void start(SparkSession spark,
-                             String jobName,
-                             Environment env,
-                             SparkEnv sparkEnv) {
-
-        AmaContext.init(spark, jobName, env);
-
-        sparkSession = spark;
-        jsc = new JavaSparkContext(spark.sparkContext());
-        sqlContext = spark.sqlContext();
-        PySparkEntryPoint.sparkEnv = sparkEnv;
-        generatePort();
-        GatewayServer gatewayServer = new GatewayServer(new PySparkEntryPoint(), port);
-
-        gatewayServer.start();
-    }
-}
diff --git a/executor/src/main/resources/codegen.py b/executor/src/main/resources/codegen.py
deleted file mode 100644 (file)
index 113d9be..0000000
+++ /dev/null
@@ -1,577 +0,0 @@
-"""
-    codegen
-    ~~~~~~~
-
-    Extension to ast that allow ast -> python code generation.
-
-    :copyright: Copyright 2008 by Armin Ronacher.
-    :license: BSD.
-"""
-from ast import *
-
-BINOP_SYMBOLS = {}
-BINOP_SYMBOLS[Add] = '+'
-BINOP_SYMBOLS[Sub] = '-'
-BINOP_SYMBOLS[Mult] = '*'
-BINOP_SYMBOLS[Div] = '/'
-BINOP_SYMBOLS[Mod] = '%'
-BINOP_SYMBOLS[Pow] = '**'
-BINOP_SYMBOLS[LShift] = '<<'
-BINOP_SYMBOLS[RShift] = '>>'
-BINOP_SYMBOLS[BitOr] = '|'
-BINOP_SYMBOLS[BitXor] = '^'
-BINOP_SYMBOLS[BitAnd] = '&'
-BINOP_SYMBOLS[FloorDiv] = '//'
-
-BOOLOP_SYMBOLS = {}
-BOOLOP_SYMBOLS[And] = 'and'
-BOOLOP_SYMBOLS[Or] = 'or'
-
-CMPOP_SYMBOLS = {}
-CMPOP_SYMBOLS[Eq] = '=='
-CMPOP_SYMBOLS[NotEq] = '!='
-CMPOP_SYMBOLS[Lt] = '<'
-CMPOP_SYMBOLS[LtE] = '<='
-CMPOP_SYMBOLS[Gt] = '>'
-CMPOP_SYMBOLS[GtE] = '>='
-CMPOP_SYMBOLS[Is] = 'is'
-CMPOP_SYMBOLS[IsNot] = 'is not'
-CMPOP_SYMBOLS[In] = 'in'
-CMPOP_SYMBOLS[NotIn] = 'not in'
-
-UNARYOP_SYMBOLS = {}
-UNARYOP_SYMBOLS[Invert] = '~'
-UNARYOP_SYMBOLS[Not] = 'not'
-UNARYOP_SYMBOLS[UAdd] = '+'
-UNARYOP_SYMBOLS[USub] = '-'
-
-
-def to_source(node, indent_with=' ' * 4, add_line_information=False):
-    """This function can convert a node tree back into python sourcecode.
-    This is useful for debugging purposes, especially if you're dealing with
-    custom asts not generated by python itself.
-
-    It could be that the sourcecode is evaluable when the AST itself is not
-    compilable / evaluable.  The reason for this is that the AST contains some
-    more data than regular sourcecode does, which is dropped during
-    conversion.
-
-    Each level of indentation is replaced with `indent_with`.  Per default this
-    parameter is equal to four spaces as suggested by PEP 8, but it might be
-    adjusted to match the application's styleguide.
-
-    If `add_line_information` is set to `True` comments for the line numbers
-    of the nodes are added to the output.  This can be used to spot wrong line
-    number information of statement nodes.
-    """
-    generator = SourceGenerator(indent_with, add_line_information)
-    generator.visit(node)
-
-    return ''.join(generator.result)
-
-class SourceGenerator(NodeVisitor):
-    """This visitor is able to transform a well formed syntax tree into python
-    sourcecode.  For more details have a look at the docstring of the
-    `node_to_source` function.
-    """
-
-    def __init__(self, indent_with, add_line_information=False):
-        self.result = []
-        self.indent_with = indent_with
-        self.add_line_information = add_line_information
-        self.indentation = 0
-        self.new_lines = 0
-
-    def write(self, x):
-        if self.new_lines:
-            if self.result:
-                self.result.append('\n' * self.new_lines)
-            self.result.append(self.indent_with * self.indentation)
-            self.new_lines = 0
-        self.result.append(x)
-
-    def newline(self, node=None, extra=0):
-        self.new_lines = max(self.new_lines, 1 + extra)
-        if node is not None and self.add_line_information:
-            self.write('# line: %s' % node.lineno)
-            self.new_lines = 1
-
-    def body(self, statements):
-        self.new_line = True
-        self.indentation += 1
-        for stmt in statements:
-            self.visit(stmt)
-        self.indentation -= 1
-
-    def body_or_else(self, node):
-        self.body(node.body)
-        if node.orelse:
-            self.newline()
-            self.write('else:')
-            self.body(node.orelse)
-
-    def signature(self, node):
-        want_comma = []
-        def write_comma():
-            if want_comma:
-                self.write(', ')
-            else:
-                want_comma.append(True)
-
-        padding = [None] * (len(node.args) - len(node.defaults))
-        for arg, default in zip(node.args, padding + node.defaults):
-            write_comma()
-            self.visit(arg)
-            if default is not None:
-                self.write('=')
-                self.visit(default)
-        if node.vararg is not None:
-            write_comma()
-            self.write('*' + node.vararg)
-        if node.kwarg is not None:
-            write_comma()
-            self.write('**' + node.kwarg)
-
-    def decorators(self, node):
-        for decorator in node.decorator_list:
-            self.newline(decorator)
-            self.write('@')
-            self.visit(decorator)
-
-    # Statements
-
-    def visit_Assert(self, node):
-        self.newline(node)
-        self.write('assert ')
-        self.visit(node.test)
-        if node.msg is not None:
-           self.write(', ')
-           self.visit(node.msg)
-
-    def visit_Assign(self, node):
-        self.newline(node)
-        for idx, target in enumerate(node.targets):
-            if idx:
-                self.write(', ')
-            self.visit(target)
-        self.write(' = ')
-        self.visit(node.value)
-
-    def visit_AugAssign(self, node):
-        self.newline(node)
-        self.visit(node.target)
-        self.write(' ' + BINOP_SYMBOLS[type(node.op)] + '= ')
-        self.visit(node.value)
-
-    def visit_ImportFrom(self, node):
-        self.newline(node)
-        self.write('from %s%s import ' % ('.' * node.level, node.module))
-        for idx, item in enumerate(node.names):
-            if idx:
-                self.write(', ')
-            self.write(item)
-
-    def visit_Import(self, node):
-        self.newline(node)
-        for item in node.names:
-            self.write('import ')
-            self.visit(item)
-
-    def visit_Expr(self, node):
-        self.newline(node)
-        self.generic_visit(node)
-
-    def visit_FunctionDef(self, node):
-        self.newline(extra=1)
-        self.decorators(node)
-        self.newline(node)
-        self.write('def %s(' % node.name)
-        self.visit(node.args)
-        self.write('):')
-        self.body(node.body)
-
-    def visit_ClassDef(self, node):
-        have_args = []
-        def paren_or_comma():
-            if have_args:
-                self.write(', ')
-            else:
-                have_args.append(True)
-                self.write('(')
-
-        self.newline(extra=2)
-        self.decorators(node)
-        self.newline(node)
-        self.write('class %s' % node.name)
-        for base in node.bases:
-            paren_or_comma()
-            self.visit(base)
-        # XXX: the if here is used to keep this module compatible
-        #      with python 2.6.
-        if hasattr(node, 'keywords'):
-            for keyword in node.keywords:
-                paren_or_comma()
-                self.write(keyword.arg + '=')
-                self.visit(keyword.value)
-            if node.starargs is not None:
-                paren_or_comma()
-                self.write('*')
-                self.visit(node.starargs)
-            if node.kwargs is not None:
-                paren_or_comma()
-                self.write('**')
-                self.visit(node.kwargs)
-        self.write(have_args and '):' or ':')
-        self.body(node.body)
-
-    def visit_If(self, node):
-        self.newline(node)
-        self.write('if ')
-        self.visit(node.test)
-        self.write(':')
-        self.body(node.body)
-        while True:
-            else_ = node.orelse
-            if len(else_) == 0:
-                break
-            elif len(else_) == 1 and isinstance(else_[0], If):
-                node = else_[0]
-                self.newline()
-                self.write('elif ')
-                self.visit(node.test)
-                self.write(':')
-                self.body(node.body)
-            else:
-                self.newline()
-                self.write('else:')
-                self.body(else_)
-                break
-
-    def visit_For(self, node):
-        self.newline(node)
-        self.write('for ')
-        self.visit(node.target)
-        self.write(' in ')
-        self.visit(node.iter)
-        self.write(':')
-        self.body_or_else(node)
-
-    def visit_While(self, node):
-        self.newline(node)
-        self.write('while ')
-        self.visit(node.test)
-        self.write(':')
-        self.body_or_else(node)
-
-    def visit_With(self, node):
-        self.newline(node)
-        self.write('with ')
-        self.visit(node.context_expr)
-        if node.optional_vars is not None:
-            self.write(' as ')
-            self.visit(node.optional_vars)
-        self.write(':')
-        self.body(node.body)
-
-    def visit_Pass(self, node):
-        self.newline(node)
-        self.write('pass')
-
-    def visit_Print(self, node):
-        # XXX: python 2.6 only
-        self.newline(node)
-        self.write('print ')
-        want_comma = False
-        if node.dest is not None:
-            self.write(' >> ')
-            self.visit(node.dest)
-            want_comma = True
-        for value in node.values:
-            if want_comma:
-                self.write(', ')
-            self.visit(value)
-            want_comma = True
-        if not node.nl:
-            self.write(',')
-
-    def visit_Delete(self, node):
-        self.newline(node)
-        self.write('del ')
-        for idx, target in enumerate(node):
-            if idx:
-                self.write(', ')
-            self.visit(target)
-
-    def visit_TryExcept(self, node):
-        self.newline(node)
-        self.write('try:')
-        self.body(node.body)
-        for handler in node.handlers:
-            self.visit(handler)
-
-    def visit_TryFinally(self, node):
-        self.newline(node)
-        self.write('try:')
-        self.body(node.body)
-        self.newline(node)
-        self.write('finally:')
-        self.body(node.finalbody)
-
-    def visit_Global(self, node):
-        self.newline(node)
-        self.write('global ' + ', '.join(node.names))
-
-    def visit_Nonlocal(self, node):
-        self.newline(node)
-        self.write('nonlocal ' + ', '.join(node.names))
-
-    def visit_Return(self, node):
-        self.newline(node)
-        if node.value is None:
-            self.write('return')
-        else:
-            self.write('return ')
-            self.visit(node.value)
-
-    def visit_Break(self, node):
-        self.newline(node)
-        self.write('break')
-
-    def visit_Continue(self, node):
-        self.newline(node)
-        self.write('continue')
-
-    def visit_Raise(self, node):
-        # XXX: Python 2.6 / 3.0 compatibility
-        self.newline(node)
-        self.write('raise')
-        if hasattr(node, 'exc') and node.exc is not None:
-            self.write(' ')
-            self.visit(node.exc)
-            if node.cause is not None:
-                self.write(' from ')
-                self.visit(node.cause)
-        elif hasattr(node, 'type') and node.type is not None:
-            self.visit(node.type)
-            if node.inst is not None:
-                self.write(', ')
-                self.visit(node.inst)
-            if node.tback is not None:
-                self.write(', ')
-                self.visit(node.tback)
-
-    # Expressions
-
-    def visit_Attribute(self, node):
-        self.visit(node.value)
-        self.write('.' + node.attr)
-
-    def visit_Call(self, node):
-        want_comma = []
-        def write_comma():
-            if want_comma:
-                self.write(', ')
-            else:
-                want_comma.append(True)
-
-        self.visit(node.func)
-        self.write('(')
-        for arg in node.args:
-            write_comma()
-            self.visit(arg)
-        for keyword in node.keywords:
-            write_comma()
-            self.write(keyword.arg + '=')
-            self.visit(keyword.value)
-        if node.starargs is not None:
-            write_comma()
-            self.write('*')
-            self.visit(node.starargs)
-        if node.kwargs is not None:
-            write_comma()
-            self.write('**')
-            self.visit(node.kwargs)
-        self.write(')')
-
-    def visit_Name(self, node):
-        self.write(node.id)
-
-    def visit_Str(self, node):
-        self.write(repr(node.s))
-
-    def visit_Bytes(self, node):
-        self.write(repr(node.s))
-
-    def visit_Num(self, node):
-        self.write(repr(node.n))
-
-    def visit_Tuple(self, node):
-        self.write('(')
-        idx = -1
-        for idx, item in enumerate(node.elts):
-            if idx:
-                self.write(', ')
-            self.visit(item)
-        self.write(idx and ')' or ',)')
-
-    def sequence_visit(left, right):
-        def visit(self, node):
-            self.write(left)
-            for idx, item in enumerate(node.elts):
-                if idx:
-                    self.write(', ')
-                self.visit(item)
-            self.write(right)
-        return visit
-
-    visit_List = sequence_visit('[', ']')
-    visit_Set = sequence_visit('{', '}')
-    del sequence_visit
-
-    def visit_Dict(self, node):
-        self.write('{')
-        for idx, (key, value) in enumerate(zip(node.keys, node.values)):
-            if idx:
-                self.write(', ')
-            self.visit(key)
-            self.write(': ')
-            self.visit(value)
-        self.write('}')
-
-    def visit_BinOp(self, node):
-        self.visit(node.left)
-        self.write(' %s ' % BINOP_SYMBOLS[type(node.op)])
-        self.visit(node.right)
-
-    def visit_BoolOp(self, node):
-        self.write('(')
-        for idx, value in enumerate(node.values):
-            if idx:
-                self.write(' %s ' % BOOLOP_SYMBOLS[type(node.op)])
-            self.visit(value)
-        self.write(')')
-
-    def visit_Compare(self, node):
-        self.write('(')
-        self.visit(node.left)
-        for op, right in zip(node.ops, node.comparators):
-            self.write(' %s ' % CMPOP_SYMBOLS[type(op)])
-            self.visit(right)
-        self.write(')')
-
-    def visit_UnaryOp(self, node):
-        self.write('(')
-        op = UNARYOP_SYMBOLS[type(node.op)]
-        self.write(op)
-        if op == 'not':
-            self.write(' ')
-        self.visit(node.operand)
-        self.write(')')
-
-    def visit_Subscript(self, node):
-        self.visit(node.value)
-        self.write('[')
-        self.visit(node.slice)
-        self.write(']')
-
-    def visit_Slice(self, node):
-        if node.lower is not None:
-            self.visit(node.lower)
-        self.write(':')
-        if node.upper is not None:
-            self.visit(node.upper)
-        if node.step is not None:
-            self.write(':')
-            if not (isinstance(node.step, Name) and node.step.id == 'None'):
-                self.visit(node.step)
-
-    def visit_ExtSlice(self, node):
-        for idx, item in node.dims:
-            if idx:
-                self.write(', ')
-            self.visit(item)
-
-    def visit_Yield(self, node):
-        self.write('yield ')
-        self.visit(node.value)
-
-    def visit_Lambda(self, node):
-        self.write('lambda ')
-        self.visit(node.args)
-        self.write(': ')
-        self.visit(node.body)
-
-    def visit_Ellipsis(self, node):
-        self.write('Ellipsis')
-
-    def generator_visit(left, right):
-        def visit(self, node):
-            self.write(left)
-            self.visit(node.elt)
-            for comprehension in node.generators:
-                self.visit(comprehension)
-            self.write(right)
-        return visit
-
-    visit_ListComp = generator_visit('[', ']')
-    visit_GeneratorExp = generator_visit('(', ')')
-    visit_SetComp = generator_visit('{', '}')
-    del generator_visit
-
-    def visit_DictComp(self, node):
-        self.write('{')
-        self.visit(node.key)
-        self.write(': ')
-        self.visit(node.value)
-        for comprehension in node.generators:
-            self.visit(comprehension)
-        self.write('}')
-
-    def visit_IfExp(self, node):
-        self.visit(node.body)
-        self.write(' if ')
-        self.visit(node.test)
-        self.write(' else ')
-        self.visit(node.orelse)
-
-    def visit_Starred(self, node):
-        self.write('*')
-        self.visit(node.value)
-
-    def visit_Repr(self, node):
-        # XXX: python 2.6 only
-        self.write('`')
-        self.visit(node.value)
-        self.write('`')
-
-    # Helper Nodes
-
-    def visit_alias(self, node):
-        self.write(node.name)
-        if node.asname is not None:
-            self.write(' as ' + node.asname)
-
-    def visit_comprehension(self, node):
-        self.write(' for ')
-        self.visit(node.target)
-        self.write(' in ')
-        self.visit(node.iter)
-        if node.ifs:
-            for if_ in node.ifs:
-                self.write(' if ')
-                self.visit(if_)
-
-    def visit_excepthandler(self, node):
-        self.newline(node)
-        self.write('except')
-        if node.type is not None:
-            self.write(' ')
-            self.visit(node.type)
-            if node.name is not None:
-                self.write(' as ')
-                self.visit(node.name)
-        self.write(':')
-        self.body(node.body)
-
-    def visit_arguments(self, node):
-        self.signature(node)
diff --git a/executor/src/main/resources/runtime.py b/executor/src/main/resources/runtime.py
deleted file mode 100644 (file)
index d01664c..0000000
+++ /dev/null
@@ -1,36 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-class AmaContext(object):
-
-    def __init__(self, sc, spark, job_id, env):
-        self.sc = sc
-        self.spark = spark
-        self.job_id = job_id
-        self.env = env
-
-    def get_dataframe(self, action_name, dataset_name, format = "parquet"):
-        return self.spark.read.format(format).load(str(self.env.working_dir) + "/" + self.job_id + "/" + action_name + "/" + dataset_name)
-
-class Environment(object):
-
-    def __init__(self, name, master, input_root_path, output_root_path, working_dir, configuration):
-        self.name = name
-        self.master = master
-        self.input_root_path = input_root_path
-        self.output_root_path = output_root_path
-        self.working_dir = working_dir
-        self.configuration = configuration
diff --git a/executor/src/main/resources/spark_intp.py b/executor/src/main/resources/spark_intp.py
deleted file mode 100755 (executable)
index f3c9fc0..0000000
+++ /dev/null
@@ -1,110 +0,0 @@
-#!/usr/bin/python
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import ast
-import codegen
-import os
-import sys
-import zipimport
-sys.path.append(os.getcwd())
-from runtime import AmaContext, Environment
-
-# os.chdir(os.getcwd() + '/build/resources/test/')
-# import zipfile
-# zip = zipfile.ZipFile('pyspark.zip')
-# zip.extractall()
-# zip = zipfile.ZipFile('py4j-0.10.4-src.zip', 'r')
-# zip.extractall()
-# sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/pyspark')
-# sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/py4j')
-
-# py4j_path = 'spark-2.2.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip'
-# py4j_importer = zipimport.zipimporter(py4j_path)
-# py4j = py4j_importer.load_module('py4j')
-from py4j.java_gateway import JavaGateway, GatewayClient, java_import
-from py4j.protocol import Py4JJavaError
-from pyspark.conf import SparkConf
-from pyspark.context import SparkContext
-from pyspark.rdd import RDD
-from pyspark.files import SparkFiles
-from pyspark.storagelevel import StorageLevel
-from pyspark import accumulators
-from pyspark.accumulators import Accumulator, AccumulatorParam
-from pyspark.broadcast import Broadcast
-from pyspark.serializers import MarshalSerializer, PickleSerializer
-from pyspark.sql import SparkSession
-from pyspark.sql import Row
-
-client = GatewayClient(port=int(sys.argv[1]))
-gateway = JavaGateway(client, auto_convert=True)
-entry_point = gateway.entry_point
-queue = entry_point.getExecutionQueue()
-
-java_import(gateway.jvm, "org.apache.spark.SparkEnv")
-java_import(gateway.jvm, "org.apache.spark.SparkConf")
-java_import(gateway.jvm, "org.apache.spark.api.java.*")
-java_import(gateway.jvm, "org.apache.spark.api.python.*")
-java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
-java_import(gateway.jvm, "org.apache.spark.sql.*")
-java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
-java_import(gateway.jvm, "scala.Tuple2")
-
-jconf = entry_point.getSparkConf()
-jsc = entry_point.getJavaSparkContext()
-
-job_id = entry_point.getJobId()
-javaEnv = entry_point.getEnv()
-
-env = Environment(javaEnv.name(), javaEnv.master(), javaEnv.inputRootPath(), javaEnv.outputRootPath(), javaEnv.workingDir(), javaEnv.configuration())
-conf = SparkConf(_jvm=gateway.jvm, _jconf=jconf)
-conf.setExecutorEnv('PYTHONPATH', ':'.join(sys.path))
-sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
-spark = SparkSession(sc, entry_point.getSparkSession())
-
-ama_context = AmaContext(sc, spark, job_id, env)
-
-while True:
-    actionData = queue.getNext()
-    resultQueue = entry_point.getResultQueue(actionData._2())
-    actionSource = actionData._1()
-    tree = ast.parse(actionSource)
-    exports = actionData._3()
-
-    for node in tree.body:
-
-        wrapper = ast.Module(body=[node])
-        try:
-            co = compile(wrapper, "<ast>", 'exec')
-            exec (co)
-            resultQueue.put('success', actionData._2(), codegen.to_source(node), '')
-
-            #if this node is an assignment, we need to check if it needs to be persisted
-            try:
-                persistCode = ''
-                if(isinstance(node,ast.Assign)):
-                    varName = node.targets[0].id
-                    if(exports.containsKey(varName)):
-                        persistCode = varName + ".write.save(\"" + env.working_dir + "/" + job_id + "/" + actionData._2() + "/" + varName + "\", format=\"" + exports[varName] + "\", mode='overwrite')"
-                        persist = compile(persistCode, '<stdin>', 'exec')
-                        exec(persist)
-
-            except:
-                resultQueue.put('error', actionData._2(), persistCode, str(sys.exc_info()[1]))
-        except:
-            resultQueue.put('error', actionData._2(), codegen.to_source(node), str(sys.exc_info()[1]))
-    resultQueue.put('completion', '', '', '')
\ No newline at end of file
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkExecutionQueue.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkExecutionQueue.scala
deleted file mode 100755 (executable)
index 411069a..0000000
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark
-
-import java.util
-import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
-
-
-class PySparkExecutionQueue {
-
-  val queue = new LinkedBlockingQueue[(String, String, util.Map[String, String])]()
-
-  def getNext(): (String, String, util.Map[String, String]) = {
-
-    // if the queue is idle for an hour it will return null which
-    // terminates the python execution, need to revisit
-    queue.poll(1, TimeUnit.HOURS)
-
-  }
-
-  def setForExec(line: (String, String, util.Map[String, String])) = {
-
-    queue.put(line)
-
-  }
-
-}
\ No newline at end of file
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkResultQueue.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkResultQueue.scala
deleted file mode 100755 (executable)
index 6dbd445..0000000
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark
-
-import org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark.ResultType.ResultType
-
-object ResultType extends Enumeration {
-  type ResultType = Value
-  val success = Value("success")
-  val error = Value("error")
-  val completion = Value("completion")
-}
-
-case class PySparkResult(
-  resultType: ResultType,
-  action: String,
-  statement: String,
-  message: String
-)
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala
deleted file mode 100755 (executable)
index 79fe18a..0000000
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark
-
-import java.io.File
-import java.util
-
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.execution.dependencies.PythonDependencies
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.sdk.AmaterasuRunner
-import org.apache.spark.SparkEnv
-import org.apache.spark.sql.SparkSession
-
-import scala.sys.process.{Process, ProcessLogger}
-
-
-
-
-class PySparkRunner extends AmaterasuRunner with Logging {
-
-  var proc: Process = _
-  var notifier: Notifier = _
-
-  override def getIdentifier: String = "pyspark"
-
-  override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = {
-    interpretSources(actionSource, actionName, exports)
-  }
-
-  def interpretSources(source: String, actionName: String, exports: util.Map[String, String]): Unit = {
-
-    PySparkEntryPoint.getExecutionQueue.setForExec((source, actionName, exports))
-    val resQueue = PySparkEntryPoint.getResultQueue(actionName)
-
-    notifier.info(s"================= started action $actionName =================")
-
-    var res: PySparkResult = null
-
-    do {
-      res = resQueue.getNext()
-      res.resultType match {
-        case ResultType.success =>
-          notifier.success(res.statement)
-        case ResultType.error =>
-          notifier.error(res.statement, res.message)
-          throw new Exception(res.message)
-        case ResultType.completion =>
-          notifier.info(s"================= finished action $actionName =================")
-      }
-    } while (res != null && res.resultType != ResultType.completion)
-  }
-
-}
-
-object PySparkRunner {
-
-  def collectCondaPackages(): String = {
-    val pkgsDirs = new File("./miniconda/pkgs")
-    (pkgsDirs.listFiles.filter {
-      file => file.getName.endsWith(".tar.bz2")
-    }.map {
-      file => s"./miniconda/pkgs/${file.getName}"
-    }.toBuffer ++ "dist/codegen.py").mkString(",")
-  }
-
-  def apply(env: Environment,
-            jobId: String,
-            notifier: Notifier,
-            spark: SparkSession,
-            pypath: String,
-            pyDeps: PythonDependencies,
-            config: ClusterConfig): PySparkRunner = {
-
-    val shellLoger = ProcessLogger(
-      (o: String) => println(o),
-      (e: String) => println(e)
-    )
-
-    //TODO: can we make this less ugly?
-
-
-    val result = new PySparkRunner
-
-    PySparkEntryPoint.start(spark, jobId, env, SparkEnv.get)
-    val port = PySparkEntryPoint.getPort
-    var intpPath = ""
-    if (env.configuration.contains("cwd")) {
-      val cwd = new File(env.configuration("cwd"))
-      intpPath = s"${cwd.getAbsolutePath}/spark_intp.py" // This is to support test environment
-    } else {
-      intpPath = s"spark_intp.py"
-    }
-    var pysparkPath = ""
-    var condaPkgs = ""
-    if (pyDeps != null)
-      condaPkgs = collectCondaPackages()
-    var sparkCmd: Seq[String] = Seq()
-    config.mode match {
-      case "yarn" =>
-        pysparkPath = s"spark/bin/spark-submit"
-        sparkCmd = Seq(pysparkPath, "--py-files", condaPkgs, "--master", "yarn", intpPath, port.toString)
-        val proc = Process(sparkCmd, None,
-          "PYTHONPATH" -> pypath,
-          "PYTHONHASHSEED" -> 0.toString)
-
-        proc.run(shellLoger)
-      case "mesos" =>
-        pysparkPath = config.pysparkPath
-        if (pysparkPath.endsWith("spark-submit")) {
-          sparkCmd = Seq(pysparkPath, "--py-files", condaPkgs, intpPath, port.toString)
-        }
-        else {
-          sparkCmd = Seq(pysparkPath, intpPath, port.toString)
-        }
-        var pysparkPython = "/usr/bin/python"
-
-        if (pyDeps != null &&
-          pyDeps.packages.nonEmpty) {
-          pysparkPython = "./miniconda/bin/python"
-        }
-        val proc = Process(sparkCmd, None,
-          "PYTHONPATH" -> pypath,
-          "PYSPARK_PYTHON" -> pysparkPython,
-        "PYTHONHASHSEED" -> 0.toString)
-
-        proc.run(shellLoger)
-    }
-
-    result.notifier = notifier
-
-    result
-  }
-
-}
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/ResultQueue.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/ResultQueue.scala
deleted file mode 100755 (executable)
index 3ac7bd7..0000000
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark
-
-import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
-
-
-class ResultQueue {
-  val queue = new LinkedBlockingQueue[PySparkResult]()
-
-  def getNext(): PySparkResult = {
-
-    // if the queue is idle for an hour it will return null which
-    // terminates the python execution, need to revisit
-    queue.poll(10, TimeUnit.MINUTES)
-
-  }
-
-  def put(
-    resultType: String,
-    action: String,
-    statement: String,
-    message: String
-  ) = {
-
-    val result = new PySparkResult(ResultType.withName(resultType), action, statement, message)
-    queue.put(result)
-  }
-}
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRRunner.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRRunner.scala
deleted file mode 100644 (file)
index d111cfb..0000000
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.execution.actions.runners.spark
-
-import java.io.ByteArrayOutputStream
-import java.util
-
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.sdk.AmaterasuRunner
-import org.apache.spark.SparkContext
-
-
-class SparkRRunner extends Logging with AmaterasuRunner {
-
-  override def getIdentifier = "spark-r"
-
-  override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = {
-  }
-}
-
-object SparkRRunner {
-  def apply(
-    env: Environment,
-    jobId: String,
-    sparkContext: SparkContext,
-    outStream: ByteArrayOutputStream,
-    notifier: Notifier,
-    jars: Seq[String]
-  ): SparkRRunner = {
-    new SparkRRunner()
-  }
-}
\ No newline at end of file
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
deleted file mode 100644 (file)
index ba7ff03..0000000
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.execution.actions.runners.spark
-
-import java.io._
-
-import com.jcabi.aether.Aether
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.dataobjects.ExecData
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.execution.dependencies.{Dependencies, PythonDependencies, PythonPackage}
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark.PySparkRunner
-import org.apache.amaterasu.executor.execution.actions.runners.spark.SparkSql.SparkSqlRunner
-import org.apache.amaterasu.sdk.{AmaterasuRunner, RunnersProvider}
-import org.apache.spark.repl.amaterasu.runners.spark.{SparkRunnerHelper, SparkScalaRunner}
-import org.eclipse.aether.util.artifact.JavaScopes
-import org.sonatype.aether.repository.RemoteRepository
-import org.sonatype.aether.util.artifact.DefaultArtifact
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-import scala.collection.concurrent.TrieMap
-import scala.sys.process._
-
-class SparkRunnersProvider extends RunnersProvider with Logging {
-
-  private val runners = new TrieMap[String, AmaterasuRunner]
-  private var shellLoger = ProcessLogger(
-    (o: String) => log.info(o),
-    (e: String) => log.error(e)
-
-  )
-  private var conf: Option[Map[String, Any]] = _
-  private var executorEnv: Option[Map[String, Any]] = _
-  private var clusterConfig: ClusterConfig = _
-
-  override def init(execData: ExecData,
-                    jobId: String,
-                    outStream: ByteArrayOutputStream,
-                    notifier: Notifier,
-                    executorId: String,
-                    config: ClusterConfig,
-                    hostName: String): Unit = {
-
-    shellLoger = ProcessLogger(
-      (o: String) => log.info(o),
-      (e: String) => log.error("", e)
-    )
-    clusterConfig = config
-    var jars = Seq.empty[String]
-
-    if (execData.deps != null) {
-      jars ++= getDependencies(execData.deps)
-    }
-
-    if (execData.pyDeps != null &&
-      execData.pyDeps.packages.nonEmpty) {
-      loadPythonDependencies(execData.pyDeps, notifier)
-    }
-
-    conf = execData.configurations.get("spark")
-    executorEnv = execData.configurations.get("spark_exec_env")
-    val sparkAppName = s"job_${jobId}_executor_$executorId"
-
-    SparkRunnerHelper.notifier = notifier
-    val spark = SparkRunnerHelper.createSpark(execData.env, sparkAppName, jars, conf, executorEnv, config, hostName)
-
-    lazy val sparkScalaRunner = SparkScalaRunner(execData.env, jobId, spark, outStream, notifier, jars)
-    sparkScalaRunner.initializeAmaContext(execData.env)
-
-    runners.put(sparkScalaRunner.getIdentifier, sparkScalaRunner)
-    var pypath = ""
-    // TODO: get rid of hard-coded version
-    config.mode match {
-      case "yarn" =>
-        pypath = s"$$PYTHONPATH:$$SPARK_HOME/python:$$SPARK_HOME/python/build:${config.spark.home}/python:${config.spark.home}/python/pyspark:${config.spark.home}/python/pyspark/build:${config.spark.home}/python/pyspark/lib/py4j-0.10.4-src.zip:${new File(".").getAbsolutePath}"
-      case "mesos" =>
-        pypath = s"${new File(".").getAbsolutePath}/miniconda/pkgs:${new File(".").getAbsolutePath}"
-    }
-    lazy val pySparkRunner = PySparkRunner(execData.env, jobId, notifier, spark, pypath, execData.pyDeps, config)
-    runners.put(pySparkRunner.getIdentifier, pySparkRunner)
-
-    lazy val sparkSqlRunner = SparkSqlRunner(execData.env, jobId, notifier, spark)
-    runners.put(sparkSqlRunner.getIdentifier, sparkSqlRunner)
-  }
-
-  private def installAnacondaPackage(pythonPackage: PythonPackage): Unit = {
-    val channel = pythonPackage.channel.getOrElse("anaconda")
-    if (channel == "anaconda") {
-      Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y ${pythonPackage.packageId}") ! shellLoger
-    } else {
-      Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.packageId}") ! shellLoger
-    }
-  }
-
-  private def installAnacondaOnNode(): Unit = {
-    // TODO: get rid of hard-coded version
-
-    this.clusterConfig.mode match {
-      case "yarn" => Seq("sh", "-c", "export HOME=$PWD && ./miniconda.sh -b -p miniconda") ! shellLoger
-      case "mesos" => Seq("sh", "Miniconda2-latest-Linux-x86_64.sh", "-b", "-p", "miniconda") ! shellLoger
-    }
-
-    Seq("bash", "-c", "export HOME=$PWD && ./miniconda/bin/python -m conda install -y conda-build") ! shellLoger
-    Seq("bash", "-c", "ln -s spark/python/pyspark miniconda/pkgs/pyspark") ! shellLoger
-  }
-
-  private def loadPythonDependencies(deps: PythonDependencies, notifier: Notifier): Unit = {
-    notifier.info("loading anaconda evn")
-    installAnacondaOnNode()
-    val codegenPackage = PythonPackage("codegen", channel = Option("auto"))
-    installAnacondaPackage(codegenPackage)
-    try {
-      // notifier.info("loadPythonDependencies #5")
-      deps.packages.foreach(pack => {
-        pack.index.getOrElse("anaconda").toLowerCase match {
-          case "anaconda" => installAnacondaPackage(pack)
-          // case "pypi" => installPyPiPackage(pack)
-        }
-      })
-    }
-    catch {
-
-      case rte: RuntimeException =>
-        val sw = new StringWriter
-        rte.printStackTrace(new PrintWriter(sw))
-        notifier.error("", s"Failed to activate environment (runtime) - cause: ${rte.getCause}, message: ${rte.getMessage}, Stack: \n${sw.toString}")
-      case e: Exception =>
-        val sw = new StringWriter
-        e.printStackTrace(new PrintWriter(sw))
-        notifier.error("", s"Failed to activate environment (other) - type: ${e.getClass.getName}, cause: ${e.getCause}, message: ${e.getMessage}, Stack: \n${sw.toString}")
-    }
-  }
-
-  override def getGroupIdentifier: String = "spark"
-
-  override def getRunner(id: String): AmaterasuRunner = runners(id)
-
-  private def getDependencies(deps: Dependencies): Seq[String] = {
-
-    // adding a local repo because Aether needs one
-    val repo = new File(System.getProperty("java.io.tmpdir"), "ama-repo")
-
-    val remotes = deps.repos.map(r =>
-      new RemoteRepository(
-        r.id,
-        r.`type`,
-        r.url
-      )).toList.asJava
-
-    val aether = new Aether(remotes, repo)
-
-    deps.artifacts.flatMap(a => {
-      aether.resolve(
-        new DefaultArtifact(a.groupId, a.artifactId, "", "jar", a.version),
-        JavaScopes.RUNTIME
-      ).map(a => a)
-    }).map(x => x.getFile.getAbsolutePath)
-
-  }
-}
\ No newline at end of file
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala
deleted file mode 100644 (file)
index 350ddb4..0000000
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.execution.actions.runners.spark.SparkSql
-
-import java.io.File
-import java.util
-
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.executor.runtime.AmaContext
-import org.apache.amaterasu.sdk.AmaterasuRunner
-import org.apache.commons.io.FilenameUtils
-import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
-import scala.collection.JavaConverters._
-
-/**
-  * Amaterasu currently supports JSON and PARQUET as data sources.
-  * CSV data source support will be provided in later versions.
-  */
-class SparkSqlRunner extends Logging with AmaterasuRunner {
-  var env: Environment = _
-  var notifier: Notifier = _
-  var jobId: String = _
-  //var actionName: String = _
-  var spark: SparkSession = _
-
-  /*
-  Method: executeQuery
-  Description: when user specifies query in amaterasu format, this method parse and executes the query.
-               If not in Amaterasu format, then directly executes the query
-  @Params: query string
-   */
-  override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = {
-
-    notifier.info(s"================= started action $actionName =================")
-
-    if (!actionSource.isEmpty) {
-
-      var result: DataFrame = null
-      if (actionSource.toLowerCase.contains("amacontext")) {
-
-        //Parse the incoming query
-        //notifier.info(s"================= parsing the SQL query =================")
-
-        val parser: List[String] = actionSource.toLowerCase.split(" ").toList
-        var sqlPart1: String = ""
-        var sqlPart2: String = ""
-        var queryTempLen: Int = 0
-
-        //get only the sql part of the query
-        for (i <- 0 to parser.indexOf("from")) {
-          sqlPart1 += parser(i) + " "
-        }
-
-        if (parser.indexOf("readas") == -1) {
-          queryTempLen = parser.length - 1
-        }
-        else
-          queryTempLen = parser.length - 3
-
-        for (i <- parser.indexOf("from") + 1 to queryTempLen) {
-          if (!parser(i).contains("amacontext"))
-            sqlPart2 += " " + parser(i)
-        }
-
-        //If no read format is speicified by the user, use PARQUET as default file format to load data
-        var fileFormat: String = null
-        //if there is no index for "readas" keyword, then set PARQUET as default read format
-        if (parser.indexOf("readas") == -1) {
-          fileFormat = "parquet"
-        }
-        else
-          fileFormat = parser(parser.indexOf("readas") + 1)
-
-
-        val locationPath: String = parser.filter(word => word.contains("amacontext")).mkString("")
-        val directories = locationPath.split("_")
-        val actionName = directories(1)
-        val dfName = directories(2)
-        val parsedQuery = sqlPart1 + locationPath + sqlPart2
-
-        //Load the dataframe from previous action
-        val loadData: DataFrame = AmaContext.getDataFrame(actionName, dfName, fileFormat)
-        loadData.createOrReplaceTempView(locationPath)
-
-
-        try{
-
-        result = spark.sql(parsedQuery)
-        notifier.success(parsedQuery)
-        } catch {
-          case e: Exception => notifier.error(parsedQuery, e.getMessage)
-        }
-
-      }
-      else {
-
-        notifier.info("Executing SparkSql on: " + actionSource)
-
-        result = spark.sql(actionSource)
-      }
-      val exportsBuff = exports.asScala.toBuffer
-      if (exportsBuff.nonEmpty) {
-        val exportName = exportsBuff.head._1
-        val exportFormat = exportsBuff.head._2
-        //notifier.info(s"exporting to -> ${env.workingDir}/$jobId/$actionName/$exportName")
-        result.write.mode(SaveMode.Overwrite).format(exportFormat).save(s"${env.workingDir}/$jobId/$actionName/$exportName")
-      }
-      notifier.info(s"================= finished action $actionName =================")
-    }
-  }
-
-  /*
-  Method to find the file type of files within a directory
-  @Params
-  folderName : Path to location of the directory containing data-source files
-  */
-
-  def findFileType(folderName: File): Array[String] = {
-    // get all the files from a directory
-    val files: Array[File] = folderName.listFiles()
-    val extensions: Array[String] = files.map(file => FilenameUtils.getExtension(file.toString))
-    extensions
-  }
-
-  override def getIdentifier: String = "sql"
-
-}
-
-object SparkSqlRunner {
-
-  def apply(env: Environment,
-            jobId: String,
-            // actionName: String,
-            notifier: Notifier,
-            spark: SparkSession): SparkSqlRunner = {
-
-    val sparkSqlRunnerObj = new SparkSqlRunner
-
-    sparkSqlRunnerObj.env = env
-    sparkSqlRunnerObj.jobId = jobId
-    //sparkSqlRunnerObj.actionName = actionName
-    sparkSqlRunnerObj.notifier = notifier
-    sparkSqlRunnerObj.spark = spark
-    sparkSqlRunnerObj
-  }
-}
index 9ab75be..90c2001 100755 (executable)
@@ -26,7 +26,6 @@ import org.apache.amaterasu.executor.common.executors.ProvidersFactory
 import org.apache.mesos.Protos._
 import org.apache.mesos.protobuf.ByteString
 import org.apache.mesos.{Executor, ExecutorDriver, MesosExecutorDriver}
-import org.apache.spark.SparkContext
 
 import scala.collection.JavaConverters._
 import scala.concurrent.ExecutionContext.Implicits.global
@@ -37,7 +36,6 @@ class MesosActionsExecutor extends Executor with Logging {
 
   var master: String = _
   var executorDriver: ExecutorDriver = _
-  var sc: SparkContext = _
   var jobId: String = _
   var actionName: String = _
   //  var sparkScalaRunner: SparkScalaRunner = _
@@ -83,7 +81,7 @@ class MesosActionsExecutor extends Executor with Logging {
     notifier = new MesosNotifier(driver)
     notifier.info(s"Executor ${executorInfo.getExecutorId.getValue} registered")
     val outStream = new ByteArrayOutputStream()
-    providersFactory = ProvidersFactory(data, jobId, outStream, notifier, executorInfo.getExecutorId.getValue, hostName, propFile = "./amaterasu.properties")
+    providersFactory = ProvidersFactory(data, jobId, outStream, notifier, executorInfo.getExecutorId.getValue, hostName, "./amaterasu.properties")
 
   }
 
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/runtime/AmaContext.scala b/executor/src/main/scala/org/apache/amaterasu/executor/runtime/AmaContext.scala
deleted file mode 100755 (executable)
index a61cd5a..0000000
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.runtime
-
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.spark.SparkContext
-import org.apache.spark.sql._
-
-object AmaContext extends Logging {
-
-  var spark: SparkSession = _
-  var sc: SparkContext = _
-  var jobId: String = _
-  var env: Environment = _
-
-  def init(spark: SparkSession,
-           jobId: String,
-           env: Environment): Unit = {
-
-    AmaContext.spark = spark
-    AmaContext.sc = spark.sparkContext
-    AmaContext.jobId = jobId
-    AmaContext.env = env
-
-  }
-
-  def getDataFrame(actionName: String, dfName: String, format: String = "parquet"): DataFrame = {
-
-    spark.read.format(format).load(s"${env.workingDir}/$jobId/$actionName/$dfName")
-
-  }
-
-  def getDataset[T: Encoder](actionName: String, dfName: String, format: String = "parquet"): Dataset[T] = {
-
-    getDataFrame(actionName, dfName, format).as[T]
-
-  }
-
-}
index f4f553c..b5f8700 100644 (file)
@@ -19,24 +19,18 @@ package org.apache.amaterasu.executor.yarn.executors
 import java.io.ByteArrayOutputStream
 import java.net.{InetAddress, URLDecoder}
 
-import scala.collection.JavaConverters._
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.apache.amaterasu.common.dataobjects.{ExecData, TaskData}
 import org.apache.amaterasu.common.logging.Logging
 import org.apache.amaterasu.executor.common.executors.{ActiveNotifier, ProvidersFactory}
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.spark.SparkContext
 
-import scala.reflect.internal.util.ScalaClassLoader
-import scala.reflect.internal.util.ScalaClassLoader.URLClassLoader
+import scala.collection.JavaConverters._
 
 
 class ActionsExecutor extends Logging {
 
   var master: String = _
-  var sc: SparkContext = _
   var jobId: String = _
   var actionName: String = _
   var taskData: TaskData = _
diff --git a/executor/src/main/scala/org/apache/spark/repl/amaterasu/AmaSparkILoop.scala b/executor/src/main/scala/org/apache/spark/repl/amaterasu/AmaSparkILoop.scala
deleted file mode 100755 (executable)
index 19ef3de..0000000
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.repl.amaterasu
-
-import java.io.PrintWriter
-
-import org.apache.spark.repl.SparkILoop
-
-import scala.tools.nsc.Settings
-import scala.tools.nsc.interpreter.IMain
-
-class AmaSparkILoop(writer: PrintWriter) extends SparkILoop(None, writer) {
-
-  def create = {
-    this.createInterpreter
-  }
-
-  def setSettings(settings: Settings) = {
-    this.settings = settings
-  }
-
-  def getIntp: IMain = {
-    this.intp
-  }
-
-}
diff --git a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
deleted file mode 100644 (file)
index f2c2afa..0000000
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.repl.amaterasu.runners.spark
-
-import java.io.{ByteArrayOutputStream, File, PrintWriter}
-
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.common.utils.FileUtils
-import org.apache.spark.repl.amaterasu.AmaSparkILoop
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.util.Utils
-import org.apache.spark.SparkConf
-
-import scala.tools.nsc.GenericRunnerSettings
-import scala.tools.nsc.interpreter.IMain
-
-object SparkRunnerHelper extends Logging {
-
-  private val conf = new SparkConf()
-  private val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf))
-  private val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl")
-
-  private var sparkSession: SparkSession = _
-
-  var notifier: Notifier = _
-
-  private var interpreter: IMain = _
-
-  def getNode: String = sys.env.get("AMA_NODE") match {
-    case None => "127.0.0.1"
-    case _ => sys.env("AMA_NODE")
-  }
-
-  def getOrCreateScalaInterperter(outStream: ByteArrayOutputStream, jars: Seq[String], recreate: Boolean = false): IMain = {
-    if (interpreter == null || recreate) {
-      initInterpreter(outStream, jars)
-    }
-    interpreter
-  }
-
-  private def scalaOptionError(msg: String): Unit = {
-    notifier.error("", msg)
-  }
-
-  private def initInterpreter(outStream: ByteArrayOutputStream, jars: Seq[String]): Unit = {
-
-    var result: IMain = null
-    val config = new ClusterConfig()
-    try {
-
-      val interpArguments = List(
-        "-Yrepl-class-based",
-        "-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
-        "-classpath", jars.mkString(File.separator)
-      )
-
-      val settings = new GenericRunnerSettings(scalaOptionError)
-      settings.processArguments(interpArguments, processAll = true)
-
-      settings.classpath.append(System.getProperty("java.class.path") + java.io.File.pathSeparator +
-        "spark-" + config.Webserver.sparkVersion + "/jars/*" + java.io.File.pathSeparator +
-        jars.mkString(java.io.File.pathSeparator))
-
-      settings.usejavacp.value = true
-
-      val out = new PrintWriter(outStream)
-      val interpreter = new AmaSparkILoop(out)
-      interpreter.setSettings(settings)
-
-      interpreter.create
-
-      val intp = interpreter.getIntp
-
-      settings.embeddedDefaults(Thread.currentThread().getContextClassLoader)
-      intp.setContextClassLoader
-      intp.initializeSynchronous
-
-      result = intp
-    }
-    catch {
-      case e: Exception =>
-        println(new Predef.String(outStream.toByteArray))
-
-    }
-
-    interpreter = result
-  }
-
-
-  def createSpark(env: Environment,
-                  sparkAppName: String,
-                  jars: Seq[String],
-                  sparkConf: Option[Map[String, Any]],
-                  executorEnv: Option[Map[String, Any]],
-                  config: ClusterConfig,
-                  hostName: String): SparkSession = {
-
-    Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
-    val minicondaPkgsPath = "miniconda/pkgs"
-    val executorMinicondaDirRef = new File(minicondaPkgsPath)
-    val minicondaFiles = if (executorMinicondaDirRef.exists) FileUtils.getAllFiles(executorMinicondaDirRef) else new Array[File](0)
-    val pyfiles = minicondaFiles.filter(f => f.getName.endsWith(".py") ||
-      f.getName.endsWith(".egg") ||
-      f.getName.endsWith(".zip"))
-
-    conf.setAppName(sparkAppName)
-      .set("spark.driver.host", hostName)
-      .set("spark.submit.deployMode", "client")
-      .set("spark.hadoop.validateOutputSpecs", "false")
-      .set("spark.logConf", "true")
-      .set("spark.submit.pyFiles", pyfiles.mkString(","))
-
-
-    val master: String = if (env.master.isEmpty) {
-      "yarn"
-    } else {
-      env.master
-    }
-
-    config.mode match {
-
-      case "mesos" =>
-        conf.set("spark.executor.uri", s"http://$getNode:${config.Webserver.Port}/spark-2.2.1-bin-hadoop2.7.tgz")
-          .setJars(jars)
-          .set("spark.master", env.master)
-          .set("spark.home", s"${scala.reflect.io.File(".").toCanonical.toString}/spark-2.2.1-bin-hadoop2.7")
-
-      case "yarn" =>
-        conf.set("spark.home", config.spark.home)
-          // TODO: parameterize those
-          .setJars(s"executor.jar" +: jars)
-          .set("spark.history.kerberos.keytab", "/etc/security/keytabs/spark.headless.keytab")
-          .set("spark.driver.extraLibraryPath", "/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64")
-          .set("spark.yarn.queue", "default")
-          .set("spark.history.kerberos.principal", "none")
-
-          .set("spark.master", master)
-          .set("spark.executor.instances", config.spark.opts.getOrElse("executor.instances", "1"))
-          .set("spark.yarn.jars", s"spark/jars/*")
-          .set("spark.executor.memory", config.spark.opts.getOrElse("executor.memory", "1g"))
-          .set("spark.dynamicAllocation.enabled", "false")
-          .set("spark.eventLog.enabled", "false")
-          .set("spark.history.fs.logDirectory", "hdfs:///spark2-history/")
-          .set("hadoop.home.dir", config.YARN.hadoopHomeDir)
-
-      case _ => throw new Exception(s"mode ${config.mode} is not legal.")
-    }
-
-    if (config.spark.opts != null && config.spark.opts.nonEmpty) {
-      config.spark.opts.foreach(kv => {
-        log.info(s"Setting ${kv._1} to ${kv._2} as specified in amaterasu.properties")
-        conf.set(kv._1, kv._2)
-      })
-    }
-
-    // adding the the configurations from spark.yml
-    sparkConf match {
-      case Some(cnf) => {
-        for (c <- cnf) {
-          if (c._2.isInstanceOf[String])
-            conf.set(c._1, c._2.toString)
-        }
-      }
-      case None =>
-    }
-
-    // setting the executor env from spark_exec.yml
-    executorEnv match {
-      case Some(env) => {
-        for (c <- env) {
-          if (c._2.isInstanceOf[String])
-            conf.setExecutorEnv(c._1, c._2.toString)
-        }
-      }
-      case None =>
-    }
-
-    conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
-
-    sparkSession = SparkSession.builder
-      .appName(sparkAppName)
-      .master(env.master)
-
-      //.enableHiveSupport()
-      .config(conf).getOrCreate()
-
-    sparkSession.conf.getAll.foreach(x => log.info(x.toString))
-
-    val hc = sparkSession.sparkContext.hadoopConfiguration
-
-    sys.env.get("AWS_ACCESS_KEY_ID") match {
-      case None =>
-      case _ =>
-        hc.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
-        hc.set("fs.s3n.awsAccessKeyId", sys.env("AWS_ACCESS_KEY_ID"))
-        hc.set("fs.s3n.awsSecretAccessKey", sys.env("AWS_SECRET_ACCESS_KEY"))
-    }
-
-    sparkSession
-  }
-}
diff --git a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala
deleted file mode 100755 (executable)
index 56a04cf..0000000
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.repl.amaterasu.runners.spark
-
-import java.io.ByteArrayOutputStream
-import java.util
-
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.executor.runtime.AmaContext
-import org.apache.amaterasu.sdk.AmaterasuRunner
-import org.apache.spark.sql.{Dataset, SparkSession}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.io.Source
-import scala.tools.nsc.interpreter.{IMain, Results}
-
-class ResHolder(var value: Any)
-
-class SparkScalaRunner(var env: Environment,
-                       var jobId: String,
-                       var interpreter: IMain,
-                       var outStream: ByteArrayOutputStream,
-                       var spark: SparkSession,
-                       var notifier: Notifier,
-                       var jars: Seq[String]) extends Logging with AmaterasuRunner {
-
-  private def scalaOptionError(msg: String): Unit = {
-    notifier.error("", msg)
-  }
-
-  override def getIdentifier = "scala"
-
-  val holder = new ResHolder(null)
-
-  override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = {
-    val source = Source.fromString(actionSource)
-    interpretSources(source, actionName, exports.asScala.toMap)
-  }
-
-  def interpretSources(source: Source, actionName: String, exports: Map[String, String]): Unit = {
-
-    notifier.info(s"================= started action $actionName =================")
-    //notifier.info(s"exports is: $exports")
-
-    for (line <- source.getLines()) {
-
-      // ignoring empty or commented lines
-      if (!line.isEmpty && !line.trim.startsWith("*") && !line.startsWith("/")) {
-
-        outStream.reset()
-        log.debug(line)
-
-        if (line.startsWith("import")) {
-          interpreter.interpret(line)
-        }
-        else {
-
-          val intresult = interpreter.interpret(line)
-
-          val result = interpreter.prevRequestList.last.lineRep.call("$result")
-
-          // intresult: Success, Error, etc
-          // result: the actual result (RDD, df, etc.) for caching
-          // outStream.toString gives you the error message
-          intresult match {
-            case Results.Success =>
-              log.debug("Results.Success")
-
-              notifier.success(line)
-
-              val resultName = interpreter.prevRequestList.last.termNames.last
-
-              //notifier.info(s" result name ${resultName.toString}")
-              //notifier.info(s" exist in exports: ${exports.contains(resultName.toString)}")
-
-              if (exports.contains(resultName.toString)) {
-
-                val format = exports(resultName.toString)
-
-                if (result != null) {
-
-                  result match {
-                    case ds: Dataset[_] =>
-                      log.debug(s"persisting DataFrame: $resultName")
-                      val writeLine = s"""$resultName.write.mode(SaveMode.Overwrite).format("$format").save("${env.workingDir}/$jobId/$actionName/$resultName")"""
-                      val writeResult = interpreter.interpret(writeLine)
-                      if (writeResult != Results.Success) {
-                        val err = outStream.toString
-                        notifier.error(writeLine, err)
-                        log.error(s"error persisting dataset: $writeLine failed with: $err")
-                        //throw new Exception(err)
-                      }
-                      log.debug(s"persisted DataFrame: $resultName")
-
-                    case _ => notifier.info(s"""+++> result type ${result.getClass}""")
-                  }
-                }
-              }
-
-            case Results.Error =>
-              log.debug("Results.Error")
-              val err = outStream.toString
-              notifier.error(line, err)
-              throw new Exception(err)
-
-            case Results.Incomplete =>
-              log.debug("Results.Incomplete")
-
-          }
-        }
-      }
-    }
-
-    notifier.info(s"================= finished action $actionName =================")
-  }
-
-  def initializeAmaContext(env: Environment): Unit = {
-
-    // setting up some context :)
-    val sc = this.spark.sparkContext
-    val sqlContext = this.spark.sqlContext
-
-    interpreter.interpret("import scala.util.control.Exception._")
-    interpreter.interpret("import org.apache.spark.{ SparkContext, SparkConf }")
-    interpreter.interpret("import org.apache.spark.sql.SQLContext")
-    interpreter.interpret("import org.apache.spark.sql.{ Dataset, SparkSession }")
-    interpreter.interpret("import org.apache.spark.sql.SaveMode")
-    interpreter.interpret("import org.apache.amaterasu.executor.runtime.AmaContext")
-    interpreter.interpret("import org.apache.amaterasu.common.runtime.Environment")
-
-    // creating a map (_contextStore) to hold the different spark contexts
-    // in th REPL and getting a reference to it
-    interpreter.interpret("var _contextStore = scala.collection.mutable.Map[String, AnyRef]()")
-    val contextStore = interpreter.prevRequestList.last.lineRep.call("$result").asInstanceOf[mutable.Map[String, AnyRef]]
-    AmaContext.init(spark, jobId, env)
-
-    // populating the contextStore
-    contextStore.put("sc", sc)
-    contextStore.put("sqlContext", sqlContext)
-    contextStore.put("env", env)
-    contextStore.put("spark", spark)
-    contextStore.put("ac", AmaContext)
-
-    interpreter.interpret("val sc = _contextStore(\"sc\").asInstanceOf[SparkContext]")
-    interpreter.interpret("val sqlContext = _contextStore(\"sqlContext\").asInstanceOf[SQLContext]")
-    interpreter.interpret("val env = _contextStore(\"env\").asInstanceOf[Environment]")
-    interpreter.interpret("val spark = _contextStore(\"spark\").asInstanceOf[SparkSession]")
-    interpreter.interpret("val AmaContext = _contextStore(\"ac\").asInstanceOf[AmaContext]")
-    interpreter.interpret("import sqlContext.implicits._")
-
-    // initializing the AmaContext
-    println(s"""AmaContext.init(sc, sqlContext ,"$jobId")""")
-
-  }
-
-}
-
-object SparkScalaRunner extends Logging {
-
-  def apply(env: Environment,
-            jobId: String,
-            spark: SparkSession,
-            outStream: ByteArrayOutputStream,
-            notifier: Notifier,
-            jars: Seq[String]): SparkScalaRunner = {
-
-    new SparkScalaRunner(env, jobId, SparkRunnerHelper.getOrCreateScalaInterperter(outStream, jars), outStream, spark, notifier, jars)
-
-  }
-
-}
diff --git a/executor/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv b/executor/src/test/resources/SparkSql/csv/SparkSqlTestCsv.csv
deleted file mode 100644 (file)
index 5f0ce0e..0000000
+++ /dev/null
@@ -1,4 +0,0 @@
-name,age
-sampath,22
-kirupa,30
-dev,19
\ No newline at end of file
diff --git a/executor/src/test/resources/SparkSql/json/SparkSqlTestData.json b/executor/src/test/resources/SparkSql/json/SparkSqlTestData.json
deleted file mode 100644 (file)
index d297f1f..0000000
+++ /dev/null
@@ -1,3 +0,0 @@
-{"name":"Sampath","age":22}
-{"name":"Kirupa", "age":30}
-{"name":"Dev", "age":19}]
\ No newline at end of file
diff --git a/executor/src/test/resources/SparkSql/parquet/.part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc b/executor/src/test/resources/SparkSql/parquet/.part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc
deleted file mode 100644 (file)
index 7d66b64..0000000
Binary files a/executor/src/test/resources/SparkSql/parquet/.part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc and /dev/null differ
diff --git a/executor/src/test/resources/SparkSql/parquet/.part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc b/executor/src/test/resources/SparkSql/parquet/.part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc
deleted file mode 100644 (file)
index 74b1890..0000000
Binary files a/executor/src/test/resources/SparkSql/parquet/.part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet.crc and /dev/null differ
diff --git a/executor/src/test/resources/SparkSql/parquet/_SUCCESS b/executor/src/test/resources/SparkSql/parquet/_SUCCESS
deleted file mode 100644 (file)
index e69de29..0000000
diff --git a/executor/src/test/resources/SparkSql/parquet/_common_metadata b/executor/src/test/resources/SparkSql/parquet/_common_metadata
deleted file mode 100644 (file)
index 5d83fd6..0000000
Binary files a/executor/src/test/resources/SparkSql/parquet/_common_metadata and /dev/null differ
diff --git a/executor/src/test/resources/SparkSql/parquet/_metadata b/executor/src/test/resources/SparkSql/parquet/_metadata
deleted file mode 100644 (file)
index ac54053..0000000
Binary files a/executor/src/test/resources/SparkSql/parquet/_metadata and /dev/null differ
diff --git a/executor/src/test/resources/SparkSql/parquet/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/executor/src/test/resources/SparkSql/parquet/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
deleted file mode 100644 (file)
index e1b0d2e..0000000
Binary files a/executor/src/test/resources/SparkSql/parquet/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet and /dev/null differ
diff --git a/executor/src/test/resources/SparkSql/parquet/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/executor/src/test/resources/SparkSql/parquet/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
deleted file mode 100644 (file)
index d807ba9..0000000
Binary files a/executor/src/test/resources/SparkSql/parquet/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet and /dev/null differ
diff --git a/executor/src/test/resources/amaterasu.properties b/executor/src/test/resources/amaterasu.properties
deleted file mode 100755 (executable)
index d402fed..0000000
+++ /dev/null
@@ -1,9 +0,0 @@
-zk=127.0.0.1
-version=0.2.0-incubating
-master=192.168.33.11
-user=root
-mode=mesos
-webserver.port=8000
-webserver.root=dist
-spark.version=2.1.1-bin-hadoop2.7
-pysparkPath = /usr/bin/python
diff --git a/executor/src/test/resources/py4j-0.10.4-src.zip b/executor/src/test/resources/py4j-0.10.4-src.zip
deleted file mode 100644 (file)
index 8c3829e..0000000
Binary files a/executor/src/test/resources/py4j-0.10.4-src.zip and /dev/null differ
diff --git a/executor/src/test/resources/py4j.tar.gz b/executor/src/test/resources/py4j.tar.gz
deleted file mode 100644 (file)
index 761a0af..0000000
Binary files a/executor/src/test/resources/py4j.tar.gz and /dev/null differ
diff --git a/executor/src/test/resources/pyspark-with-amacontext.py b/executor/src/test/resources/pyspark-with-amacontext.py
deleted file mode 100755 (executable)
index c940eea..0000000
+++ /dev/null
@@ -1,40 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-class AmaContext(object):
-
-    def __init__(self, sc, spark, job_id, env):
-        self.sc = sc
-        self.spark = spark
-        self.job_id = job_id
-        self.env = env
-
-    def get_dataframe(self, action_name, dataset_name, format = "parquet"):
-        return self.spark.read.format(format).load(str(self.env.working_dir) + "/" + self.job_id + "/" + action_name + "/" + dataset_name)
-
-class Environment(object):
-
-    def __init__(self, name, master, input_root_path, output_root_path, working_dir, configuration):
-        self.name = name
-        self.master = master
-        self.input_root_path = input_root_path
-        self.output_root_path = output_root_path
-        self.working_dir = working_dir
-        self.configuration = configuration
-
-data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
-rdd = ama_context.sc.parallelize(data)
-odd = rdd.filter(lambda num: num % 2 != 0)
\ No newline at end of file
diff --git a/executor/src/test/resources/pyspark.tar.gz b/executor/src/test/resources/pyspark.tar.gz
deleted file mode 100644 (file)
index 6f25984..0000000
Binary files a/executor/src/test/resources/pyspark.tar.gz and /dev/null differ
diff --git a/executor/src/test/resources/pyspark.zip b/executor/src/test/resources/pyspark.zip
deleted file mode 100644 (file)
index a624c9f..0000000
Binary files a/executor/src/test/resources/pyspark.zip and /dev/null differ
diff --git a/executor/src/test/resources/simple-pyspark.py b/executor/src/test/resources/simple-pyspark.py
deleted file mode 100755 (executable)
index 923f81c..0000000
+++ /dev/null
@@ -1,26 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
-try:
-    rdd = sc.parallelize(data)
-
-    def g(x):
-        print(x)
-
-    rdd.foreach(g)
-except Exception as e:
-    print type(e), e
\ No newline at end of file
diff --git a/executor/src/test/resources/simple-python-err.py b/executor/src/test/resources/simple-python-err.py
deleted file mode 100755 (executable)
index dff1491..0000000
+++ /dev/null
@@ -1,21 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-data = [1, 2, 3, 4, 5]
-1/0
-
-with open('/tmp/amatest-in.txt', 'a') as the_file:
-    the_file.write('hi there\n') # python will convert \n to os.linesep
diff --git a/executor/src/test/resources/simple-python.py b/executor/src/test/resources/simple-python.py
deleted file mode 100755 (executable)
index 0ac6f85..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-data = [1, 2, 3, 4, 5]
-print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
-print(data)
-
-with open('/tmp/amatest-in.txt', 'a') as the_file:
-    the_file.write('hi there\n') # python will convert \n to os.linesep
diff --git a/executor/src/test/resources/simple-spark.scala b/executor/src/test/resources/simple-spark.scala
deleted file mode 100755 (executable)
index a11a458..0000000
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.apache.amaterasu.executor.runtime.AmaContext
-import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
-
-val data = Seq(1,3,4,5,6)
-
-
-val sc = AmaContext.sc
-val rdd = sc.parallelize(data)
-val sqlContext = AmaContext.spark
-
-import sqlContext.implicits._
-val x: DataFrame = rdd.toDF()
-
-x.write.mode(SaveMode.Overwrite)
\ No newline at end of file
diff --git a/executor/src/test/resources/step-2.scala b/executor/src/test/resources/step-2.scala
deleted file mode 100755 (executable)
index a3d034c..0000000
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.apache.amaterasu.executor.runtime.AmaContext
-
-
-val highNoDf = AmaContext.getDataFrame("start", "x").where("age > 20")
-highNoDf.show
diff --git a/executor/src/test/resources/tmp/job/start/x/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/executor/src/test/resources/tmp/job/start/x/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
deleted file mode 100644 (file)
index e1b0d2e..0000000
Binary files a/executor/src/test/resources/tmp/job/start/x/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet and /dev/null differ
diff --git a/executor/src/test/resources/tmp/job/start/x/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/executor/src/test/resources/tmp/job/start/x/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
deleted file mode 100644 (file)
index d807ba9..0000000
Binary files a/executor/src/test/resources/tmp/job/start/x/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet and /dev/null differ
diff --git a/executor/src/test/scala/org/apache/amaterasu/RunnersTests/RunnersLoadingTests.scala b/executor/src/test/scala/org/apache/amaterasu/RunnersTests/RunnersLoadingTests.scala
deleted file mode 100644 (file)
index 2decb9c..0000000
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.RunnersTests
-
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.executor.common.executors.ProvidersFactory
-import org.scalatest._
-
-@DoNotDiscover
-class RunnersLoadingTests extends FlatSpec with Matchers with BeforeAndAfterAll {
-
-  var env: Environment = _
-  var factory: ProvidersFactory = _
-
-  "RunnersFactory" should "be loaded with all the implementations of AmaterasuRunner in its classpath" in {
-    val r = factory.getRunner("spark", "scala")
-    r should not be null
-  }
-}
\ No newline at end of file
diff --git a/executor/src/test/scala/org/apache/amaterasu/spark/PySparkRunnerTests.scala b/executor/src/test/scala/org/apache/amaterasu/spark/PySparkRunnerTests.scala
deleted file mode 100755 (executable)
index f12d676..0000000
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.spark
-
-import java.io.File
-
-import org.apache.amaterasu.executor.common.executors.ProvidersFactory
-import org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark.PySparkRunner
-import org.apache.log4j.{Level, Logger}
-import org.scalatest.{BeforeAndAfterAll, DoNotDiscover, FlatSpec, Matchers}
-
-import scala.collection.JavaConverters._
-import scala.io.Source
-
-@DoNotDiscover
-class PySparkRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll {
-
-  Logger.getLogger("org").setLevel(Level.OFF)
-  Logger.getLogger("akka").setLevel(Level.OFF)
-  Logger.getLogger("spark").setLevel(Level.OFF)
-  Logger.getLogger("jetty").setLevel(Level.OFF)
-  Logger.getRootLogger.setLevel(Level.OFF)
-
-  var factory: ProvidersFactory = _
-
-  def delete(file: File) {
-    if (file.isDirectory)
-      Option(file.listFiles).map(_.toList).getOrElse(Nil).foreach(delete(_))
-    file.delete
-  }
-
-  override protected def afterAll(): Unit = {
-    val pysparkDir = new File(getClass.getResource("/pyspark").getPath)
-    val py4jDir = new File(getClass.getResource("/py4j").getPath)
-    delete(pysparkDir)
-    delete(py4jDir)
-    super.afterAll()
-  }
-
-
-  "PySparkRunner.executeSource" should "execute simple python code" in {
-    val src = Source.fromFile(getClass.getResource("/simple-python.py").getPath).mkString
-    var runner = factory.getRunner("spark", "pyspark").get.asInstanceOf[PySparkRunner]
-    println("3333333333333333333333")
-    runner.executeSource(src, "test_action1", Map.empty[String, String].asJava)
-  }
-
-  it should "print and trows an errors" in {
-    a[java.lang.Exception] should be thrownBy {
-      val src = Source.fromFile(getClass.getResource("/simple-python-err.py").getPath).mkString
-      var runner = factory.getRunner("spark", "pyspark").get.asInstanceOf[PySparkRunner]
-      runner.executeSource(src, "test_action2", Map.empty[String, String].asJava)
-    }
-  }
-
-  it should "also execute spark code written in python" in {
-    val src = Source.fromFile(getClass.getResource("/simple-pyspark.py").getPath).mkString
-    var runner = factory.getRunner("spark", "pyspark").get.asInstanceOf[PySparkRunner]
-    runner.executeSource(src, "test_action3", Map("numDS" -> "parquet").asJava)
-  }
-
-  it should "also execute spark code written in python with AmaContext being used" in {
-    val src = Source.fromFile(getClass.getResource("/pyspark-with-amacontext.py").getPath).mkString
-    var runner = factory.getRunner("spark", "pyspark").get.asInstanceOf[PySparkRunner]
-    runner.executeSource(src, "test_action4", Map.empty[String, String].asJava)
-  }
-
-}
\ No newline at end of file
diff --git a/executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala b/executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala
deleted file mode 100755 (executable)
index 1d79fc9..0000000
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.spark
-
-
-import scala.collection.JavaConverters._
-import org.apache.amaterasu.executor.common.executors.ProvidersFactory
-import org.apache.amaterasu.executor.runtime.AmaContext
-import org.apache.spark.repl.amaterasu.runners.spark.SparkScalaRunner
-import org.scalatest.{BeforeAndAfterAll, DoNotDiscover, FlatSpec, Matchers}
-
-import scala.io.Source
-
-@DoNotDiscover
-class SparkScalaRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll {
-  var factory: ProvidersFactory = _
-  var runner: SparkScalaRunner = _
-
-
-  "SparkScalaRunner" should "execute the simple-spark.scala" in {
-
-    val sparkRunner =factory.getRunner("spark", "scala").get.asInstanceOf[SparkScalaRunner]
-    val script = getClass.getResource("/simple-spark.scala").getPath
-    val sourceCode = Source.fromFile(script).getLines().mkString("\n")
-    sparkRunner.executeSource(sourceCode, "start", Map.empty[String, String].asJava)
-
-  }
-
-  "SparkScalaRunner" should "execute step-2.scala and access data from simple-spark.scala" in {
-
-    val sparkRunner =factory.getRunner("spark", "scala").get.asInstanceOf[SparkScalaRunner]
-    val script = getClass.getResource("/step-2.scala").getPath
-    sparkRunner.env.workingDir = s"${getClass.getResource("/tmp").getPath}"
-    AmaContext.init(sparkRunner.spark,"job",sparkRunner.env)
-    val sourceCode = Source.fromFile(script).getLines().mkString("\n")
-    sparkRunner.executeSource(sourceCode, "cont", Map.empty[String, String].asJava)
-
-  }
-
-
-}
\ No newline at end of file
diff --git a/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala b/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala
deleted file mode 100644 (file)
index 90cf73b..0000000
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.spark
-
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.executor.common.executors.ProvidersFactory
-import org.apache.amaterasu.executor.execution.actions.runners.spark.SparkSql.SparkSqlRunner
-import org.apache.amaterasu.utilities.TestNotifier
-import org.apache.log4j.Logger
-import org.apache.log4j.Level
-import org.apache.spark.sql.{SaveMode, SparkSession}
-import org.scalatest.{BeforeAndAfterAll, DoNotDiscover, FlatSpec, Matchers}
-
-import scala.collection.JavaConverters._
-
-/**
-  * Created by kirupa on 10/12/16.
-  */
-@DoNotDiscover
-class SparkSqlRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll {
-
-  Logger.getLogger("org").setLevel(Level.OFF)
-  Logger.getLogger("akka").setLevel(Level.OFF)
-  Logger.getLogger("spark").setLevel(Level.OFF)
-  Logger.getLogger("jetty").setLevel(Level.OFF)
-  Logger.getRootLogger.setLevel(Level.OFF)
-
-
-  val notifier = new TestNotifier()
-
-  var factory: ProvidersFactory = _
-  var env: Environment = _
-
-  /*
-  Test whether parquet is used as default file format to load data from previous actions
-   */
-
-  "SparkSql" should "load data as parquet if no input foramt is specified" in {
-
-    val sparkSql: SparkSqlRunner = factory.getRunner("spark", "sql").get.asInstanceOf[SparkSqlRunner]
-    val spark: SparkSession = sparkSql.spark
-
-    //Prepare test dataset
-    val inputDf = spark.read.parquet(getClass.getResource("/SparkSql/parquet").getPath)
-
-    inputDf.write.mode(SaveMode.Overwrite).parquet(s"${env.workingDir}/${sparkSql.jobId}/sparksqldefaultparquetjobaction/sparksqldefaultparquetjobactiontempdf")
-    sparkSql.executeSource("select * FROM AMACONTEXT_sparksqldefaultparquetjobaction_sparksqldefaultparquetjobactiontempdf where age=22", "sql_parquet_test", Map("result" -> "parquet").asJava)
-
-    val outputDf = spark.read.parquet(s"${env.workingDir}/${sparkSql.jobId}/sql_parquet_test/result")
-    println("Output Default Parquet: " + inputDf.count + "," + outputDf.first().getString(1))
-    outputDf.first().getString(1) shouldEqual "Michael"
-  }
-
-  /*
-  Test whether the parquet data is successfully parsed, loaded and processed by SparkSQL
-   */
-
-  "SparkSql" should "load PARQUET data directly from previous action's dataframe and persist the Data in working directory" in {
-
-    val sparkSql: SparkSqlRunner = factory.getRunner("spark", "sql").get.asInstanceOf[SparkSqlRunner]
-    val spark: SparkSession = sparkSql.spark
-
-    //Prepare test dataset
-    val inputDf = spark.read.parquet(getClass.getResource("/SparkSql/parquet").getPath)
-    inputDf.write.mode(SaveMode.Overwrite).parquet(s"${env.workingDir}/${sparkSql.jobId}/sparksqlparquetjobaction/sparksqlparquetjobactiontempdf")
-    sparkSql.executeSource("select * FROM AMACONTEXT_sparksqlparquetjobaction_sparksqlparquetjobactiontempdf READAS parquet", "sql_parquet_test", Map("result2" -> "parquet").asJava)
-
-    val outputDf = spark.read.parquet(s"${env.workingDir}/${sparkSql.jobId}/sql_parquet_test/result2")
-    println("Output Parquet: " + inputDf.count + "," + outputDf.count)
-    inputDf.first().getString(1) shouldEqual outputDf.first().getString(1)
-  }
-
-
-  /*
-  Test whether the JSON data is successfully parsed, loaded by SparkSQL
-  */
-
-  "SparkSql" should "load JSON data directly from previous action's dataframe and persist the Data in working directory" in {
-
-    val sparkSql: SparkSqlRunner = factory.getRunner("spark", "sql").get.asInstanceOf[SparkSqlRunner]
-    val spark: SparkSession = sparkSql.spark
-
-    //Prepare test dataset
-    val inputDf = spark.read.json(getClass.getResource("/SparkSql/json").getPath)
-
-    inputDf.write.mode(SaveMode.Overwrite).json(s"${env.workingDir}/${sparkSql.jobId}/sparksqljsonjobaction/sparksqljsonjobactiontempdf")
-    sparkSql.executeSource("select * FROM AMACONTEXT_sparksqljsonjobaction_sparksqljsonjobactiontempdf  where age='30' READAS json", "sql_json_test", Map("result" -> "json").asJava)
-
-    val outputDf = spark.read.json(s"${env.workingDir}/${sparkSql.jobId}/sql_json_test/result")
-    println("Output JSON: " + inputDf.count + "," + outputDf.count)
-    outputDf.first().getString(1) shouldEqual "Kirupa"
-
-  }
-
-  /*
-  Test whether the CSV data is successfully parsed, loaded by SparkSQL
-  */
-
-  "SparkSql" should "load CSV data directly from previous action's dataframe and persist the Data in working directory" in {
-
-    val sparkSql: SparkSqlRunner = factory.getRunner("spark", "sql").get.asInstanceOf[SparkSqlRunner]
-    val spark: SparkSession = sparkSql.spark
-
-    //Prepare test dataset
-    val inputDf = spark.read.csv(getClass.getResource("/SparkSql/csv").getPath)
-    inputDf.write.mode(SaveMode.Overwrite).csv(s"${env.workingDir}/${sparkSql.jobId}/sparksqlcsvjobaction/sparksqlcsvjobactiontempdf")
-    sparkSql.executeSource("select * FROM AMACONTEXT_sparksqlcsvjobaction_sparksqlcsvjobactiontempdf READAS csv", "sql_csv_test", Map("result" -> "csv").asJava)
-
-
-    val outputDf = spark.read.csv(s"${env.workingDir}/${sparkSql.jobId}/sql_csv_test/result")
-    println("Output CSV: " + inputDf.count + "," + outputDf.count)
-    inputDf.first().getString(1) shouldEqual outputDf.first().getString(1)
-  }
-
-  /*
-  Test whether the data can be directly read from a file and executed by sparkSql
-  */
-//  "SparkSql" should "load data directly from a file and persist the Data in working directory" in {
-//
-//    val tempFileEnv = Environment()
-//    tempFileEnv.workingDir = "file:/tmp/"
-//    AmaContext.init(spark, "sparkSqlFileJob", tempFileEnv)
-//
-//    val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlFileJob", notifier, spark)
-//    sparkSql.executeSource("SELECT * FROM parquet.`" + getClass.getResource("/SparkSql/parquet").getPath + "`", "sql_parquet_file_test", Map("result" -> "parquet").asJava)
-//    val outputParquetDf = spark.read.parquet(s"${tempFileEnv.workingDir}/sparkSqlFileJob/sql_parquet_file_test/result")
-//    println("Output Parquet dataframe: " + outputParquetDf.show)
-//    outputParquetDf.first().getString(1) shouldEqual "Michael"
-//    sparkSql.executeSource("SELECT * FROM json.`" + getClass.getResource("/SparkSql/json").getPath + "`","sql_parquet_file_test", Map("result" -> "json").asJava)
-//
-//    val outputJsonDf = spark.read.parquet(s"${tempFileEnv.workingDir}/sparkSqlFileJob/sql_parquet_file_test/result")
-//    println("Output Json dataframe: " + outputJsonDf.show)
-//    outputJsonDf.first().getString(1) shouldEqual "Sampath"
-//
-//  }
-
-
-}
diff --git a/executor/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala b/executor/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala
deleted file mode 100644 (file)
index b11a4f9..0000000
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.spark
-
-import java.io.{ByteArrayOutputStream, File}
-
-import org.apache.amaterasu.RunnersTests.RunnersLoadingTests
-import org.apache.amaterasu.common.dataobjects.ExecData
-import org.apache.amaterasu.common.execution.dependencies._
-import org.apache.amaterasu.common.runtime.Environment
-import org.apache.amaterasu.executor.common.executors.ProvidersFactory
-import org.apache.amaterasu.utilities.TestNotifier
-import org.apache.spark.repl.amaterasu.runners.spark.SparkScalaRunner
-import org.apache.spark.sql.SparkSession
-import org.scalatest._
-
-
-
-import scala.collection.mutable.ListBuffer
-
-
-class SparkTestsSuite extends Suites(
-  new PySparkRunnerTests,
-  new RunnersLoadingTests,
-  new SparkSqlRunnerTests,
-  new SparkScalaRunnerTests
-) with BeforeAndAfterAll {
-
-  var env: Environment = _
-  var factory: ProvidersFactory = _
-  var spark: SparkSession = _
-
-  private def createTestMiniconda(): Unit = {
-    println(s"PATH: ${new File(".").getAbsolutePath}")
-    new File("miniconda/pkgs").mkdirs()
-  }
-
-  override def beforeAll(): Unit = {
-
-    // I can't apologise enough for this
-    val resources = new File(getClass.getResource("/spark_intp.py").getPath).getParent
-    val workDir = new File(resources).getParentFile.getParent
-
-    env = Environment()
-    env.workingDir = s"file://$workDir"
-
-    env.master = "local[1]"
-    if (env.configuration != null) env.configuration ++ "pysparkPath" -> "/usr/bin/python" else env.configuration = Map(
-      "pysparkPath" -> "/usr/bin/python",
-      "cwd" -> resources
-    )
-
-    val excEnv = Map[String, Any](
-      "PYTHONPATH" -> resources
-    )
-    createTestMiniconda()
-    env.configuration ++ "spark_exec_env" -> excEnv
-    factory = ProvidersFactory(ExecData(env,
-      Dependencies(ListBuffer.empty[Repo], List.empty[Artifact]),
-      PythonDependencies(List.empty[PythonPackage]),
-      Map(
-        "spark" -> Map.empty[String, Any],
-        "spark_exec_env" -> Map("PYTHONPATH" -> resources))),
-      "test",
-      new ByteArrayOutputStream(),
-      new TestNotifier(),
-      "test",
-      "localhost",
-      getClass.getClassLoader.getResource("amaterasu.properties").getPath)
-    spark = factory.getRunner("spark", "scala").get.asInstanceOf[SparkScalaRunner].spark
-
-    this.nestedSuites.filter(s => s.isInstanceOf[RunnersLoadingTests]).foreach(s => s.asInstanceOf[RunnersLoadingTests].factory = factory)
-    this.nestedSuites.filter(s => s.isInstanceOf[PySparkRunnerTests]).foreach(s => s.asInstanceOf[PySparkRunnerTests].factory = factory)
-    this.nestedSuites.filter(s => s.isInstanceOf[SparkSqlRunnerTests]).foreach(s => s.asInstanceOf[SparkSqlRunnerTests].factory = factory)
-    this.nestedSuites.filter(s => s.isInstanceOf[SparkScalaRunnerTests]).foreach(s => s.asInstanceOf[SparkScalaRunnerTests].factory = factory)
-    this.nestedSuites.filter(s => s.isInstanceOf[SparkSqlRunnerTests]).foreach(s => s.asInstanceOf[SparkSqlRunnerTests].env = env)
-
-    super.beforeAll()
-  }
-
-  override def afterAll(): Unit = {
-    new File("miniconda").delete()
-    spark.stop()
-
-    super.afterAll()
-  }
-
-}
diff --git a/executor/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala b/executor/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala
deleted file mode 100644 (file)
index 16cb97b..0000000
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.utilities
-
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.logging.Logging
-
-
-class TestNotifier extends Notifier with Logging {
-
-  override def info(msg: String): Unit = {
-    log.info(msg)
-  }
-
-  override def success(line: String): Unit = {
-    log.info(s"successfully executed line: $line")
-  }
-
-  override def error(line: String, msg: String): Unit = {
-    log.error(s"Error executing line: $line message: $msg")
-  }
-}
index 1828100..d3bcb7d 100644 (file)
@@ -250,10 +250,10 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
         val commands: List[String] = List(
           "/bin/bash ./miniconda.sh -b -p $PWD/miniconda && ",
           s"/bin/bash spark/bin/load-spark-env.sh && ",
-          s"java -cp spark/jars/*:executor.jar:spark/conf/:${config.YARN.hadoopHomeDir}/conf/ " +
+          s"java -cp spark/jars/*:executor.jar:spark-runner.jar:spark-runtime.jar:spark/conf/:${config.YARN.hadoopHomeDir}/conf/ " +
             "-Xmx1G " +
             "-Dscala.usejavacp=true " +
-            "-Dhdp.version=2.6.1.0-129 " +
+            "-Dhdp.version=2.6.5.0-292 " +
             "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " +
             s"'${jobManager.jobId}' '${config.master}' '${actionData.name}' '${URLEncoder.encode(taskData, "UTF-8")}' '${URLEncoder.encode(execData, "UTF-8")}' '${actionData.id}-${container.getId.getContainerId}' '$address' " +
             s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " +
@@ -266,22 +266,37 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
         ctx.setCommands(commands)
         ctx.setTokens(allTokens)
 
+        val yarnJarPath = new Path(config.YARN.hdfsJarsPath)
+
+        //TODO Arun - Remove the hardcoding of the dist path
+        /*  val resources = mutable.Map[String, LocalResource]()
+          val binaryFileIter = fs.listFiles(new Path(s"${config.YARN.hdfsJarsPath}/dist"), false)
+          while (binaryFileIter.hasNext) {
+            val eachFile = binaryFileIter.next().getPath
+            resources (eachFile.getName) = setLocalResourceFromPath(fs.makeQualified(eachFile))
+          }
+          resources("log4j.properties") = setLocalResourceFromPath(fs.makeQualified(new Path(s"${config.YARN.hdfsJarsPath}/log4j.properties")))
+          resources ("amaterasu.properties") = setLocalResourceFromPath(fs.makeQualified(new Path(s"${config.YARN.hdfsJarsPath}/amaterasu.properties")))*/
+
         val resources = mutable.Map[String, LocalResource](
-          "executor.jar" -> executorJar,
-          "amaterasu.properties" -> propFile,
+          "executor.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/executor-${config.version}-all.jar"))),
+          "spark-runner.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/spark-runner-${config.version}-all.jar"))),
+          "spark-runtime.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/spark-runtime-${config.version}.jar"))),
+          "amaterasu.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/amaterasu.properties"))),
+          "log4j.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/log4j.properties"))),
           // TODO: Nadav/Eyal all of these should move to the executor resource setup
-          "miniconda.sh" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/Miniconda2-latest-Linux-x86_64.sh"))),
-          "codegen.py" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/codegen.py"))),
-          "runtime.py" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/runtime.py"))),
-          "spark-version-info.properties" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/spark-version-info.properties"))),
-          "spark_intp.py" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/spark_intp.py"))))
+          "miniconda.sh" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/miniconda.sh"))),
+          "codegen.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/codegen.py"))),
+          "runtime.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/runtime.py"))),
+          "spark-version-info.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/spark-version-info.properties"))),
+          "spark_intp.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/spark_intp.py"))))
 
         val frameworkFactory = FrameworkProvidersFactory(env, config)
         val framework = frameworkFactory.getFramework(actionData.groupId)
 
         //adding the framework and executor resources
-        setupResources(framework.getGroupIdentifier, resources, framework.getGroupIdentifier)
-        setupResources(s"${framework.getGroupIdentifier}/${actionData.typeId}", resources, s"${framework.getGroupIdentifier}-${actionData.typeId}")
+        setupResources(yarnJarPath, framework.getGroupIdentifier, resources, framework.getGroupIdentifier)
+        setupResources(yarnJarPath, s"${framework.getGroupIdentifier}/${actionData.typeId}", resources, s"${framework.getGroupIdentifier}-${actionData.typeId}")
 
         ctx.setLocalResources(resources)
 
@@ -327,9 +342,9 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
     ByteBuffer.wrap(dob.getData, 0, dob.getLength)
   }
 
-  private def setupResources(frameworkPath: String, countainerResources: mutable.Map[String, LocalResource], resourcesPath: String): Unit = {
+  private def setupResources(yarnJarPath: Path, frameworkPath: String, countainerResources: mutable.Map[String, LocalResource], resourcesPath: String): Unit = {
 
-    val sourcePath = Path.mergePaths(jarPath, new Path(s"/$resourcesPath"))
+    val sourcePath = Path.mergePaths(yarnJarPath, new Path(s"/$resourcesPath"))
 
     if (fs.exists(sourcePath)) {
 
index 18dbed9..e01ea42 100755 (executable)
@@ -126,9 +126,9 @@ if ! ls ${BASEDIR}/dist/spark*.tgz 1> /dev/null 2>&1; then
     #wget https://d3kbcqa49mib13.cloudfront.net/spark-2.2.1-bin-hadoop2.7.tgz -P ${BASEDIR}/dist
     wget http://apache.mirror.digitalpacific.com.au/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz -P ${BASEDIR}/dist
 fi
-if [ ! -f ${BASEDIR}/dist/Miniconda2-latest-Linux-x86_64.sh ]; then
+if [ ! -f ${BASEDIR}/dist/miniconda.sh ]; then
     echo "${bold}Fetching miniconda distributable ${NC}"
-    wget https://repo.continuum.io/miniconda/Miniconda2-latest-Linux-x86_64.sh -P ${BASEDIR}/dist
+    wget https://repo.continuum.io/miniconda/Miniconda2-latest-Linux-x86_64.sh -O miniconda.sh -P ${BASEDIR}/dist
 fi
 cp ${BASEDIR}/amaterasu.properties ${BASEDIR}/dist
 eval $CMD | grep "===>"
index 8aa58f1..f6af18f 100755 (executable)
@@ -136,9 +136,9 @@ fi
 
 echo $CMD
 
-if [ ! -f ${BASEDIR}/dist/Miniconda2-latest-Linux-x86_64.sh ]; then
+if [ ! -f ${BASEDIR}/dist/miniconda.sh ]; then
     echo "${bold}Fetching miniconda distributable ${NC}"
-    wget https://repo.continuum.io/miniconda/Miniconda2-latest-Linux-x86_64.sh -P ${BASEDIR}/dist
+    wget https://repo.continuum.io/miniconda/Miniconda2-latest-Linux-x86_64.sh -O ${BASEDIR}/dist/miniconda.sh
 fi
 
 
index 0bd7afd..cb9e896 100755 (executable)
 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
-zk=127.0.0.1
-version=0.2.0-incubating-rc4
-master=192.168.33.11
-user=root
+zk=192.168.1.90
+version=0.2.0-incubating-rc3
+master=192.168.1.90
+user=amaterasu
 mode=yarn
 webserver.port=8000
 webserver.root=dist
@@ -25,5 +25,9 @@ yarn.jarspath=hdfs:///apps/amaterasu
 spark.home=/usr/hdp/current/spark2-client
 #spark.home=/opt/cloudera/parcels/SPARK2-2.1.0.cloudera2-1.cdh5.7.0.p0.171658/lib/spark2
 yarn.hadoop.home.dir=/etc/hadoop
-spark.opts.spark.yarn.am.extraJavaOptions="-Dhdp.version=2.6.1.0-129"
-spark.opts.spark.driver.extraJavaOptions="-Dhdp.version=2.6.1.0-129"
+#spark.opts.spark.yarn.am.extraJavaOptions="-Dhdp.version=2.6.1.0-129"
+#spark.opts.spark.driver.extraJavaOptions="-Dhdp.version=2.6.1.0-129"
+spark.opts.spark.yarn.am.extraJavaOptions="-Dhdp.version=2.6.5.0-292"
+spark.opts.spark.driver.extraJavaOptions="-Dhdp.version=2.6.5.0-292"
+amaterasu.executor.extra.java.opts = "-Xmx1G -Dscala.usejavacp=true -Dhdp.version=2.6.5.0-292"
+
index 1056e01..c222795 100644 (file)
  * limitations under the License.
  */
 include 'leader'
-include 'executor'
+project(':leader')
+
 include 'common'
+project(':common')
+
+include 'executor'
+project(':executor')
+
 include 'sdk'
 findProject(':sdk')?.name = 'amaterasu-sdk'
 
+//Spark
+include 'spark-runner'
+project(':spark-runner').projectDir=file("frameworks/spark/runner")
+include 'spark-runtime'
+project(':spark-runtime').projectDir=file("frameworks/spark/runtime")
+