[HOTFIX] SDV framework for presto cluster test suite
authorajantha-bhat <ajanthabhat@gmail.com>
Mon, 28 Jan 2019 04:44:53 +0000 (10:14 +0530)
committerravipesala <ravi.pesala@gmail.com>
Wed, 30 Jan 2019 09:26:15 +0000 (14:56 +0530)
[HOTFIX] SDV framework for presto cluster test suite
a) Added a suite for presto cluster test with a sample test case where carbon presto reads the store created by spark.
b) When single suite selected for running. other module test cases were running like SDK, CLI, processing. Fixed this problem adding sdvtest profile modules that has issues

This closes #3111

17 files changed:
common/pom.xml
examples/flink/pom.xml
format/pom.xml
integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeLocalDictTest.scala
integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeTest.scala
integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala
integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
integration/spark-common-cluster-test/pom.xml
integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/PrestoSampleTestCase.scala [new file with mode: 0644]
integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
pom.xml
processing/pom.xml
store/sdk/pom.xml
streaming/pom.xml
tools/cli/pom.xml

index 85510af..14cd52f 100644 (file)
       </plugin>
     </plugins>
   </build>
+  <profiles>
+    <profile>
+      <id>sdvtest</id>
+      <properties>
+        <maven.test.skip>true</maven.test.skip>
+      </properties>
+    </profile>
+  </profiles>
 
 </project>
index 127b8fb..0819c12 100644 (file)
       </plugin>
     </plugins>
   </build>
+  <profiles>
+    <profile>
+      <id>sdvtest</id>
+      <properties>
+        <maven.test.skip>true</maven.test.skip>
+      </properties>
+    </profile>
+  </profiles>
 
 </project>
\ No newline at end of file
index 74760ab..7fdc6cc 100644 (file)
       </plugin>
     </plugins>
   </build>
+  <profiles>
+    <profile>
+      <id>sdvtest</id>
+      <properties>
+        <maven.test.skip>true</maven.test.skip>
+      </properties>
+    </profile>
+  </profiles>
 
 </project>
\ No newline at end of file
index 4360977..2735969 100644 (file)
@@ -79,7 +79,7 @@ class PrestoAllDataTypeLocalDictTest extends FunSuiteLike with BeforeAndAfterAll
     map.put("hive.metastore", "file")
     map.put("hive.metastore.catalog.dir", s"file://$storePath")
 
-    prestoServer.startServer(storePath, "testdb", map)
+    prestoServer.startServer("testdb", map)
     prestoServer.execute("drop table if exists testdb.testtable")
     prestoServer.execute("drop schema if exists testdb")
     prestoServer.execute("create schema testdb")
index 17490e4..205469c 100644 (file)
@@ -80,7 +80,7 @@ class PrestoAllDataTypeTest extends FunSuiteLike with BeforeAndAfterAll {
     map.put("hive.metastore", "file")
     map.put("hive.metastore.catalog.dir", s"file://$storePath")
 
-    prestoServer.startServer(storePath, "testdb", map)
+    prestoServer.startServer("testdb", map)
     prestoServer.execute("drop table if exists testdb.testtable")
     prestoServer.execute("drop schema if exists testdb")
     prestoServer.execute("create schema testdb")
index 6d17b8b..bdee4a1 100644 (file)
@@ -57,7 +57,7 @@ class PrestoTestNonTransactionalTableFiles extends FunSuiteLike with BeforeAndAf
     map.put("hive.metastore", "file")
     map.put("hive.metastore.catalog.dir", s"file://$storePath")
 
-    prestoServer.startServer(storePath, "sdk_output", map)
+    prestoServer.startServer("sdk_output", map)
   }
 
   override def afterAll(): Unit = {
index 0bde313..672e90f 100644 (file)
@@ -25,6 +25,7 @@ import scala.util.{Failure, Success, Try}
 
 import com.facebook.presto.Session
 import com.facebook.presto.execution.QueryIdGenerator
+import com.facebook.presto.jdbc.PrestoStatement
 import com.facebook.presto.metadata.SessionPropertyManager
 import com.facebook.presto.spi.`type`.TimeZoneKey.UTC_KEY
 import com.facebook.presto.spi.security.Identity
@@ -47,18 +48,17 @@ class PrestoServer {
   createSession
   lazy val queryRunner = new DistributedQueryRunner(createSession, 4, prestoProperties)
   var dbName : String = null
+  var statement : PrestoStatement = _
 
 
   /**
    * start the presto server
    *
-   * @param carbonStorePath the store path of carbon
    */
-  def startServer(carbonStorePath: String): Unit = {
+  def startServer(): Unit = {
 
     LOGGER.info("======== STARTING PRESTO SERVER ========")
-    val queryRunner: DistributedQueryRunner = createQueryRunner(
-      prestoProperties, carbonStorePath)
+    val queryRunner: DistributedQueryRunner = createQueryRunner(prestoProperties)
 
     LOGGER.info("STARTED SERVER AT :" + queryRunner.getCoordinator.getBaseUrl)
   }
@@ -66,25 +66,23 @@ class PrestoServer {
   /**
    * start the presto server
    *
-   * @param carbonStorePath the store path of carbon
    * @param dbName the database name, if not a default database
    */
-  def startServer(carbonStorePath: String, dbName: String, properties: util.Map[String, String]= new util.HashMap[String, String]()): Unit = {
+  def startServer(dbName: String, properties: util.Map[String, String] = new util.HashMap[String, String]()): Unit = {
 
     this.dbName = dbName
     carbonProperties.putAll(properties)
     LOGGER.info("======== STARTING PRESTO SERVER ========")
-    val queryRunner: DistributedQueryRunner = createQueryRunner(
-      prestoProperties, carbonStorePath)
-
+    val queryRunner: DistributedQueryRunner = createQueryRunner(prestoProperties)
+    val conn: Connection = createJdbcConnection(dbName)
+    statement = conn.createStatement().asInstanceOf[PrestoStatement]
     LOGGER.info("STARTED SERVER AT :" + queryRunner.getCoordinator.getBaseUrl)
   }
 
   /**
    * Instantiates the Presto Server to connect with the Apache CarbonData
    */
-  private def createQueryRunner(extraProperties: util.Map[String, String],
-      carbonStorePath: String): DistributedQueryRunner = {
+  private def createQueryRunner(extraProperties: util.Map[String, String]) = {
     Try {
       queryRunner.installPlugin(new CarbondataPlugin)
       val carbonProperties = ImmutableMap.builder[String, String]
@@ -105,6 +103,7 @@ class PrestoServer {
    */
   def stopServer(): Unit = {
     queryRunner.close()
+    statement.close()
     LOGGER.info("***** Stopping The Server *****")
   }
 
@@ -117,9 +116,7 @@ class PrestoServer {
   def executeQuery(query: String): List[Map[String, Any]] = {
 
     Try {
-      val conn: Connection = createJdbcConnection(dbName)
       LOGGER.info(s"***** executing the query ***** \n $query")
-      val statement = conn.createStatement()
       val result: ResultSet = statement.executeQuery(query)
       convertResultSetToList(result)
     } match {
@@ -131,11 +128,8 @@ class PrestoServer {
   }
 
   def execute(query: String) = {
-
     Try {
-      val conn: Connection = createJdbcConnection(dbName)
       LOGGER.info(s"***** executing the query ***** \n $query")
-      val statement = conn.createStatement()
       statement.execute(query)
     } match {
       case Success(result) => result
index a3ec125..9fe89cc 100644 (file)
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.facebook.presto</groupId>
+      <artifactId>presto-jdbc</artifactId>
+      <version>0.210</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.antlr</groupId>
+          <artifactId>antlr4-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 
   <build>
             <java.awt.headless>true</java.awt.headless>
             <spark.master.url>${spark.master.url}</spark.master.url>
             <hdfs.url>${hdfs.url}</hdfs.url>
+            <presto.jdbc.url>${presto.jdbc.url}</presto.jdbc.url>
+            <spark.hadoop.hive.metastore.uris>${spark.hadoop.hive.metastore.uris}</spark.hadoop.hive.metastore.uris>
             <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store>
           </systemProperties>
         </configuration>
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/PrestoSampleTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/PrestoSampleTestCase.scala
new file mode 100644 (file)
index 0000000..336f8bc
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cluster.sdv.generated
+
+import org.apache.spark.sql.common.util._
+import org.scalatest.BeforeAndAfterAll
+
+class PrestoSampleTestCase extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    sql("DROP TABLE IF EXISTS sample_table")
+    if (System.getProperty("spark.master.url") != null) {
+    QueryTest.PrestoQueryTest.initJdbcConnection("default")
+    }
+  }
+
+  test("test read spark store from presto ") {
+    sql("show tables").show(false)
+
+    sql("DROP TABLE IF EXISTS sample_table")
+    sql("CREATE TABLE sample_table (name string) STORED BY 'carbondata'")
+    sql("insert into sample_table select 'ajantha'")
+    sql("select * from sample_table ").show(200, false)
+    sql("describe formatted sample_table ").show(200, false)
+    if (System.getProperty("spark.master.url") != null) {
+      // supports only running through cluster
+      val actualResult: List[Map[String, Any]] = QueryTest.PrestoQueryTest
+              .executeQuery("select * from sample_table")
+     println("ans---------" + actualResult(0).toString())
+      val expectedResult: List[Map[String, Any]] = List(Map(
+        "name" -> "ajantha"))
+      assert(actualResult.toString() equals expectedResult.toString())
+    }
+  }
+
+  override def afterAll {
+    sql("DROP TABLE IF EXISTS sample_table")
+    QueryTest.PrestoQueryTest.closeJdbcConnection()
+  }
+
+}
\ No newline at end of file
index f2ae2cb..5367e0d 100644 (file)
@@ -187,3 +187,19 @@ class SDVSuites4 extends Suites with BeforeAndAfterAll {
     println("---------------- Stopped spark -----------------")
   }
 }
+
+/**
+ * Suite class for presto tests
+ */
+class SDVSuites5 extends Suites with BeforeAndAfterAll {
+
+  val suites =  new PrestoSampleTestCase :: Nil
+
+  override val nestedSuites = suites.toIndexedSeq
+
+  override protected def afterAll() = {
+    println("---------------- Stopping spark -----------------")
+    TestQueryExecutor.INSTANCE.stop()
+    println("---------------- Stopped spark -----------------")
+  }
+}
\ No newline at end of file
index 39beae1..9d4fe79 100644 (file)
 package org.apache.spark.sql.common.util
 
 import java.io.{ObjectInputStream, ObjectOutputStream}
-import java.util.{Locale, TimeZone}
+import java.sql.{Connection, DriverManager, ResultSet}
+import java.util.{Locale, Properties}
 
-import org.apache.carbondata.common.logging.LogServiceFactory
 import scala.collection.JavaConversions._
+import scala.util.{Failure, Success, Try}
 
+import com.facebook.presto.jdbc.{PrestoConnection, PrestoStatement}
 import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.command.LoadDataCommand
 import org.apache.spark.sql.hive.CarbonSessionCatalog
 import org.apache.spark.sql.test.{ResourceRegisterAndCopier, TestQueryExecutor}
-import org.apache.spark.sql.{CarbonSession, DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 import org.scalatest.Suite
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.commons.lang.StringUtils
 
 class QueryTest extends PlanTest with Suite {
 
@@ -154,8 +156,6 @@ object QueryTest {
     }
   }
 
-  import java.text.DecimalFormat
-
   /**
    * Runs the plan and makes sure the answer matches the expected result.
    * If there was exception during the execution or the contents of the DataFrame does not
@@ -220,4 +220,87 @@ object QueryTest {
 
     return None
   }
+
+  object PrestoQueryTest {
+
+    var statement : PrestoStatement = _
+
+    def initJdbcConnection(dbName: String): Unit = {
+      val conn: Connection = if (System.getProperty("presto.jdbc.url") != null) {
+        createJdbcConnection(dbName, System.getProperty("presto.jdbc.url"))
+      } else {
+        createJdbcConnection(dbName, "localhost:8086")
+      }
+      statement = conn.createStatement().asInstanceOf[PrestoStatement]
+    }
+
+    def closeJdbcConnection(): Unit = {
+      statement.close()
+    }
+
+    /**
+     * execute the query by establishing the jdbc connection
+     *
+     * @param query
+     * @return
+     */
+    def executeQuery(query: String): List[Map[String, Any]] = {
+      Try {
+        val result: ResultSet = statement.executeQuery(query)
+        convertResultSetToList(result)
+      } match {
+        case Success(result) => result
+        case Failure(jdbcException) =>
+          throw jdbcException
+      }
+    }
+
+    /**
+     * Creates a JDBC Client to connect CarbonData to Presto
+     *
+     * @return
+     */
+    private def createJdbcConnection(dbName: String, url: String) = {
+      val JDBC_DRIVER = "com.facebook.presto.jdbc.PrestoDriver"
+      var DB_URL : String = null
+      if (StringUtils.isEmpty(dbName)) {
+        DB_URL = "jdbc:presto://"+ url + "/carbondata/default"
+      } else {
+        DB_URL = "jdbc:presto://" + url + "/carbondata/" + dbName
+      }
+      val properties = new Properties
+      // The database Credentials
+      properties.setProperty("user", "test")
+
+      // STEP 2: Register JDBC driver
+      Class.forName(JDBC_DRIVER)
+      // STEP 3: Open a connection
+      DriverManager.getConnection(DB_URL, properties)
+    }
+
+    /**
+     * convert result set into scala list of map
+     * each map represents a row
+     *
+     * @param queryResult
+     * @return
+     */
+    private def convertResultSetToList(queryResult: ResultSet): List[Map[String, Any]] = {
+      val metadata = queryResult.getMetaData
+      val colNames = (1 to metadata.getColumnCount) map metadata.getColumnName
+      Iterator.continually(buildMapFromQueryResult(queryResult, colNames)).takeWhile(_.isDefined)
+        .map(_.get).toList
+    }
+
+    private def buildMapFromQueryResult(queryResult: ResultSet,
+        colNames: Seq[String]): Option[Map[String, Any]] = {
+      if (queryResult.next()) {
+        Some(colNames.map(name => name -> queryResult.getObject(name)).toMap)
+      }
+      else {
+        None
+      }
+    }
+  }
+
 }
index eaef9c1..0729713 100644 (file)
@@ -59,6 +59,11 @@ object Spark2TestQueryExecutor {
     FileFactory.getConfiguration.
       set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER")
   }
+
+  if (System.getProperty("spark.hadoop.hive.metastore.uris") != null) {
+    conf.set("spark.hadoop.hive.metastore.uris",
+      System.getProperty("spark.hadoop.hive.metastore.uris"))
+  }
   val metaStoreDB = s"$integrationPath/spark-common-cluster-test/target"
   val spark = SparkSession
     .builder().config(conf)
diff --git a/pom.xml b/pom.xml
index 1d69df3..6e3bde2 100644 (file)
--- a/pom.xml
+++ b/pom.xml
     <dev.path>${basedir}/dev</dev.path>
     <spark.master.url>local[2]</spark.master.url>
     <hdfs.url>local</hdfs.url>
+    <presto.jdbc.url>localhost:8086</presto.jdbc.url>
+    <spark.hadoop.hive.metastore.uris>thrift://localhost:8086</spark.hadoop.hive.metastore.uris>
     <suite.name>org.apache.carbondata.cluster.sdv.suite.SDVSuites</suite.name>
     <script.exetension>.sh</script.exetension>
     <carbon.hive.based.metastore>false</carbon.hive.based.metastore>
index ef739ee..adbac00 100644 (file)
       </plugin>
     </plugins>
   </build>
+  <profiles>
+    <profile>
+      <id>sdvtest</id>
+      <properties>
+        <maven.test.skip>true</maven.test.skip>
+      </properties>
+    </profile>
+  </profiles>
 
 </project>
index 3a84f0a..272332d 100644 (file)
       </plugin>
     </plugins>
   </build>
+  <profiles>
+    <profile>
+      <id>sdvtest</id>
+      <properties>
+        <maven.test.skip>true</maven.test.skip>
+      </properties>
+    </profile>
+  </profiles>
 </project>
index 0c90750..08dc2ce 100644 (file)
       </plugin>
     </plugins>
   </build>
+  <profiles>
+    <profile>
+      <id>sdvtest</id>
+      <properties>
+        <maven.test.skip>true</maven.test.skip>
+      </properties>
+    </profile>
+  </profiles>
 </project>
index 71ea2c6..738ce18 100644 (file)
       </plugin>
     </plugins>
   </build>
+  <profiles>
+    <profile>
+      <id>sdvtest</id>
+      <properties>
+        <maven.test.skip>true</maven.test.skip>
+      </properties>
+    </profile>
+  </profiles>
 </project>