Fixed SparkScalaRunnerTests failures
authorShad Amez <shad.amezng@gmail.com>
Tue, 22 May 2018 12:15:26 +0000 (17:45 +0530)
committerShad Amez <shad.amezng@gmail.com>
Tue, 22 May 2018 12:15:26 +0000 (17:45 +0530)
executor/src/test/resources/simple-spark.scala
executor/src/test/resources/step-2.scala
executor/src/test/resources/tmp/job/start/x/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet [new file with mode: 0644]
executor/src/test/resources/tmp/job/start/x/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet [new file with mode: 0644]
executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala
executor/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala

index 797235d..a11a458 100755 (executable)
@@ -1,11 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 import org.apache.amaterasu.executor.runtime.AmaContext
-import org.apache.spark.sql.{DataFrame, SaveMode}
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+
+val data = Seq(1,3,4,5,6)
 
-val data = Array(1, 2, 3, 4, 5)
 
 val sc = AmaContext.sc
 val rdd = sc.parallelize(data)
-val sqlContext = AmaContext.sqlContext
+val sqlContext = AmaContext.spark
 
 import sqlContext.implicits._
 val x: DataFrame = rdd.toDF()
index 34ad839..a3d034c 100755 (executable)
@@ -1,7 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 import org.apache.amaterasu.executor.runtime.AmaContext
 
-val oddRdd = AmaContext.getRDD[Int]("start", "rdd").filter(x=>x/2 == 0)
-oddRdd.take(5).foreach(println)
 
-val highNoDf = AmaContext.getDataFrame("start", "x").where("_1 > 3")
+val highNoDf = AmaContext.getDataFrame("start", "x").where("age > 20")
 highNoDf.show
diff --git a/executor/src/test/resources/tmp/job/start/x/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/executor/src/test/resources/tmp/job/start/x/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
new file mode 100644 (file)
index 0000000..e1b0d2e
Binary files /dev/null and b/executor/src/test/resources/tmp/job/start/x/part-r-00000-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet differ
diff --git a/executor/src/test/resources/tmp/job/start/x/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/executor/src/test/resources/tmp/job/start/x/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet
new file mode 100644 (file)
index 0000000..d807ba9
Binary files /dev/null and b/executor/src/test/resources/tmp/job/start/x/part-r-00001-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet differ
index d41feea..68c06ce 100755 (executable)
@@ -1,54 +1,56 @@
-//package org.apache.amaterasu.spark
-//
-//import java.io.File
-//
-//import org.apache.amaterasu.common.runtime._
-//import org.apache.amaterasu.common.configuration.ClusterConfig
-//import org.apache.amaterasu.utilities.TestNotifier
-//
-//import scala.collection.JavaConverters._
-//import org.apache.commons.io.FileUtils
-//import java.io.ByteArrayOutputStream
-//
-//import org.apache.spark.SparkConf
-//import org.apache.spark.repl.Main
-//import org.apache.spark.repl.amaterasu.runners.spark.{SparkRunnerHelper, SparkScalaRunner}
-//import org.apache.spark.sql.SparkSession
-//import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-//
-//class SparkScalaRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll {
-//
-//  var runner: SparkScalaRunner = _
-//
-//  override protected def beforeAll(): Unit = {
-//
-//    FileUtils.deleteQuietly(new File("/tmp/job_5/"))
-//
-//    val env = Environment()
-//    env.workingDir = "file:///tmp"
-//    env.master = "local[*]"
-//
-//
-//    val spark = SparkRunnerHelper.createSpark(env, "job_5", Seq.empty[String], Map.empty)
-//
-//
-//    val notifier = new TestNotifier()
-//    val strm = new ByteArrayOutputStream()
-//    runner = SparkScalaRunner(env, "job_5", spark, strm, notifier, Seq.empty[String])
-//    super.beforeAll()
-//  }
-//
-//  "SparkScalaRunner" should "execute the simple-spark.scala" in {
-//
-//    val script = getClass.getResource("/simple-spark.scala").getPath
-//    runner.executeSource(script, "start", Map.empty[String, String].asJava)
-//
-//  }
-//
-//  "SparkScalaRunner" should "execute step-2.scala and access data from simple-spark.scala" in {
-//
-//    val script = getClass.getResource("/step-2.scala").getPath
-//    runner.executeSource(script, "cont", Map.empty[String, String].asJava)
-//
-//  }
-//}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.spark
+
+
+import scala.collection.JavaConverters._
+import org.apache.amaterasu.executor.common.executors.ProvidersFactory
+import org.apache.amaterasu.executor.runtime.AmaContext
+import org.apache.spark.repl.amaterasu.runners.spark.SparkScalaRunner
+import org.scalatest.{BeforeAndAfterAll, DoNotDiscover, FlatSpec, Matchers}
+
+import scala.io.Source
+
+@DoNotDiscover
+class SparkScalaRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll {
+  var factory: ProvidersFactory = _
+  var runner: SparkScalaRunner = _
+
+
+  "SparkScalaRunner" should "execute the simple-spark.scala" in {
+
+
+    val sparkRunner =factory.getRunner("spark", "scala").get.asInstanceOf[SparkScalaRunner]
+    val script = getClass.getResource("/simple-spark.scala").getPath
+    val sourceCode = Source.fromFile(script).getLines().mkString("\n")
+    sparkRunner.executeSource(sourceCode, "start", Map.empty[String, String].asJava)
+
+  }
+
+  "SparkScalaRunner" should "execute step-2.scala and access data from simple-spark.scala" in {
+
+    val sparkRunner =factory.getRunner("spark", "scala").get.asInstanceOf[SparkScalaRunner]
+    val script = getClass.getResource("/step-2.scala").getPath
+    sparkRunner.env.workingDir = s"${getClass.getResource("/tmp").getPath}"
+    AmaContext.init(sparkRunner.spark,"job",sparkRunner.env)
+    val sourceCode = Source.fromFile(script).getLines().mkString("\n")
+    sparkRunner.executeSource(sourceCode, "cont", Map.empty[String, String].asJava)
+
+  }
+
+
+}
index 8a1e549..b11a4f9 100644 (file)
@@ -36,7 +36,9 @@ import scala.collection.mutable.ListBuffer
 class SparkTestsSuite extends Suites(
   new PySparkRunnerTests,
   new RunnersLoadingTests,
-  new SparkSqlRunnerTests) with BeforeAndAfterAll {
+  new SparkSqlRunnerTests,
+  new SparkScalaRunnerTests
+) with BeforeAndAfterAll {
 
   var env: Environment = _
   var factory: ProvidersFactory = _
@@ -84,9 +86,9 @@ class SparkTestsSuite extends Suites(
     this.nestedSuites.filter(s => s.isInstanceOf[RunnersLoadingTests]).foreach(s => s.asInstanceOf[RunnersLoadingTests].factory = factory)
     this.nestedSuites.filter(s => s.isInstanceOf[PySparkRunnerTests]).foreach(s => s.asInstanceOf[PySparkRunnerTests].factory = factory)
     this.nestedSuites.filter(s => s.isInstanceOf[SparkSqlRunnerTests]).foreach(s => s.asInstanceOf[SparkSqlRunnerTests].factory = factory)
+    this.nestedSuites.filter(s => s.isInstanceOf[SparkScalaRunnerTests]).foreach(s => s.asInstanceOf[SparkScalaRunnerTests].factory = factory)
     this.nestedSuites.filter(s => s.isInstanceOf[SparkSqlRunnerTests]).foreach(s => s.asInstanceOf[SparkSqlRunnerTests].env = env)
 
-
     super.beforeAll()
   }