Merge branch 'master' into version-0.2.0-incubating-rc3 version-0.2.0-incubating-rc3 23/head
authorYaniv Rodenski <roadan@gmail.com>
Sat, 9 Jun 2018 03:47:49 +0000 (13:47 +1000)
committerGitHub <noreply@github.com>
Sat, 9 Jun 2018 03:47:49 +0000 (13:47 +1000)
1  2 
executor/src/test/resources/simple-spark.scala
executor/src/test/resources/step-2.scala
executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala

@@@ -1,23 -1,9 +1,24 @@@
 -
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
  import org.apache.amaterasu.executor.runtime.AmaContext
- import org.apache.spark.sql.{DataFrame, SaveMode}
+ import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+ val data = Seq(1,3,4,5,6)
  
- val data = Array(1, 2, 3, 4, 5)
  
  val sc = AmaContext.sc
  val rdd = sc.parallelize(data)
@@@ -1,23 -1,5 +1,21 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
  import org.apache.amaterasu.executor.runtime.AmaContext
  
- val oddRdd = AmaContext.getRDD[Int]("start", "rdd").filter(x=>x/2 == 0)
- oddRdd.take(5).foreach(println)
  
- val highNoDf = AmaContext.getDataFrame("start", "x").where("_1 > 3")
+ val highNoDf = AmaContext.getDataFrame("start", "x").where("age > 20")
  highNoDf.show
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
- //package org.apache.amaterasu.spark
- //
- //import java.io.File
- //
- //import org.apache.amaterasu.common.runtime._
- //import org.apache.amaterasu.common.configuration.ClusterConfig
- //import org.apache.amaterasu.utilities.TestNotifier
- //
- //import scala.collection.JavaConverters._
- //import org.apache.commons.io.FileUtils
- //import java.io.ByteArrayOutputStream
- //
- //import org.apache.spark.SparkConf
- //import org.apache.spark.repl.Main
- //import org.apache.spark.repl.amaterasu.runners.spark.{SparkRunnerHelper, SparkScalaRunner}
- //import org.apache.spark.sql.SparkSession
- //import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
- //
- //class SparkScalaRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll {
- //
- //  var runner: SparkScalaRunner = _
- //
- //  override protected def beforeAll(): Unit = {
- //
- //    FileUtils.deleteQuietly(new File("/tmp/job_5/"))
- //
- //    val env = Environment()
- //    env.workingDir = "file:///tmp"
- //    env.master = "local[*]"
- //
- //
- //    val spark = SparkRunnerHelper.createSpark(env, "job_5", Seq.empty[String], Map.empty)
- //
- //
- //    val notifier = new TestNotifier()
- //    val strm = new ByteArrayOutputStream()
- //    runner = SparkScalaRunner(env, "job_5", spark, strm, notifier, Seq.empty[String])
- //    super.beforeAll()
- //  }
- //
- //  "SparkScalaRunner" should "execute the simple-spark.scala" in {
- //
- //    val script = getClass.getResource("/simple-spark.scala").getPath
- //    runner.executeSource(script, "start", Map.empty[String, String].asJava)
- //
- //  }
- //
- //  "SparkScalaRunner" should "execute step-2.scala and access data from simple-spark.scala" in {
- //
- //    val script = getClass.getResource("/step-2.scala").getPath
- //    runner.executeSource(script, "cont", Map.empty[String, String].asJava)
- //
- //  }
- //}
+ package org.apache.amaterasu.spark
+ import scala.collection.JavaConverters._
+ import org.apache.amaterasu.executor.common.executors.ProvidersFactory
+ import org.apache.amaterasu.executor.runtime.AmaContext
+ import org.apache.spark.repl.amaterasu.runners.spark.SparkScalaRunner
+ import org.scalatest.{BeforeAndAfterAll, DoNotDiscover, FlatSpec, Matchers}
+ import scala.io.Source
+ @DoNotDiscover
+ class SparkScalaRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll {
+   var factory: ProvidersFactory = _
+   var runner: SparkScalaRunner = _
+   "SparkScalaRunner" should "execute the simple-spark.scala" in {
+     val sparkRunner =factory.getRunner("spark", "scala").get.asInstanceOf[SparkScalaRunner]
+     val script = getClass.getResource("/simple-spark.scala").getPath
+     val sourceCode = Source.fromFile(script).getLines().mkString("\n")
+     sparkRunner.executeSource(sourceCode, "start", Map.empty[String, String].asJava)
+   }
+   "SparkScalaRunner" should "execute step-2.scala and access data from simple-spark.scala" in {
+     val sparkRunner =factory.getRunner("spark", "scala").get.asInstanceOf[SparkScalaRunner]
+     val script = getClass.getResource("/step-2.scala").getPath
+     sparkRunner.env.workingDir = s"${getClass.getResource("/tmp").getPath}"
+     AmaContext.init(sparkRunner.spark,"job",sparkRunner.env)
+     val sourceCode = Source.fromFile(script).getLines().mkString("\n")
+     sparkRunner.executeSource(sourceCode, "cont", Map.empty[String, String].asJava)
+   }
 -}
++}