[CARBONDATA-3230] Add alter test case for datasource
authorxubo245 <xubo29@huawei.com>
Wed, 26 Dec 2018 04:03:01 +0000 (12:03 +0800)
committerravipesala <ravi.pesala@gmail.com>
Tue, 8 Jan 2019 07:32:43 +0000 (13:02 +0530)
[CARBONDATA-3230] Add ALTER test case with datasource for using parquet and carbon

1.add column. => carbon and parquet don't support, limit from Spark
2.drop column => carbon doesn't support in sql, limit from Spark, but using DF is ok; parquet use DF is ok, but sql doesn't support iy.
3.rename column =„Äč carbon and parquet support it.
4.change datatype of column => carbon parquet doesn't support, limit from Spark, spark only support change comment

This closes #3024

integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala

index 026c5ca..f89c1a8 100644 (file)
@@ -26,6 +26,7 @@ import scala.collection.mutable
 import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
 import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.carbondata.datasource.TestUtil._
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField => SparkStructField, StructType}
 import org.apache.spark.util.SparkUtil
 import org.scalatest.{BeforeAndAfterAll, FunSuite}
 
@@ -77,6 +78,332 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
     spark.sql("drop table if exists testformat")
   }
 
+  test("test add columns for table of using carbon with sql") {
+    // TODO: should support add columns for carbon dataSource table
+    // Limit from spark
+    import spark.implicits._
+    import spark._
+    try {
+      val df = spark.sparkContext.parallelize(1 to 10)
+        .map(x => ("a" + x % 10, "b", x))
+        .toDF("c1", "c2", "number")
+      sql("DROP TABLE IF EXISTS test_parquet")
+      sql("DROP TABLE IF EXISTS carbon_table")
+      // Saves dataFrame to carbon file
+      df.write
+        .format("parquet").saveAsTable("test_parquet")
+      sql("CREATE TABLE carbon_table(c1 STRING, c2 STRING, number INT) USING carbon")
+      sql("INSERT INTO carbon_table SELECT * FROM test_parquet")
+      TestUtil.checkAnswer(sql("SELECT * FROM carbon_table WHERE c1='a1'"),
+        sql("SELECT * FROM test_parquet WHERE c1='a1'"))
+      if (!sparkContext.version.startsWith("2.1")) {
+        val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
+        DataMapStoreManager.getInstance()
+          .clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/carbon_table"))
+        assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
+      }
+      assert(df.schema.map(_.name) === Seq("c1", "c2", "number"))
+      sql("ALTER TABLE carbon_table ADD COLUMNS (a1 INT, b1 STRING) ")
+      assert(false)
+    } catch {
+      case e: Exception =>
+        assert(e.getMessage.contains("ALTER ADD COLUMNS does not support datasource table with type carbon."))
+    } finally {
+      sql("DROP TABLE IF EXISTS test_parquet")
+      sql("DROP TABLE IF EXISTS carbon_table")
+    }
+  }
+
+  test("test add columns for table of using carbon with DF") {
+    import spark.implicits._
+    import spark._
+    try {
+      val df = spark.sparkContext.parallelize(1 to 10)
+        .map(x => ("a" + x % 10, "b", x))
+        .toDF("c1", "c2", "number")
+      sql("DROP TABLE IF EXISTS carbon_table")
+      // Saves dataFrame to carbon file
+      df.write
+        .format("carbon").saveAsTable("carbon_table")
+      val customSchema = StructType(Array(
+        SparkStructField("c1", StringType),
+        SparkStructField("c2", StringType),
+        SparkStructField("number", IntegerType)))
+
+      val carbonDF = spark.read
+        .format("carbon")
+        .option("tableName", "carbon_table")
+        .schema(customSchema)
+        .load()
+
+      assert(carbonDF.schema.map(_.name) === Seq("c1", "c2", "number"))
+      val carbonDF2 = carbonDF.drop("c1")
+      assert(carbonDF2.schema.map(_.name) === Seq("c2", "number"))
+    } catch {
+      case e: Exception =>
+        e.printStackTrace()
+        assert(false)
+    } finally {
+      sql("DROP TABLE IF EXISTS carbon_table")
+    }
+  }
+
+  test("test drop columns for table of using carbon") {
+    // TODO: should support drop columns for carbon dataSource table
+    // Limit from spark
+    import spark.implicits._
+    import spark._
+    try {
+      val df = spark.sparkContext.parallelize(1 to 10)
+        .map(x => ("a" + x % 10, "b", x))
+        .toDF("c1", "c2", "number")
+      sql("DROP TABLE IF EXISTS test_parquet")
+      sql("DROP TABLE IF EXISTS carbon_table")
+      // Saves dataFrame to carbon file
+      df.write
+        .format("parquet").saveAsTable("test_parquet")
+      sql("CREATE TABLE carbon_table(c1 STRING, c2 STRING, number INT) USING carbon")
+      sql("INSERT INTO carbon_table SELECT * FROM test_parquet")
+      TestUtil.checkAnswer(sql("SELECT * FROM carbon_table WHERE c1='a1'"),
+        sql("SELECT * FROM test_parquet WHERE c1='a1'"))
+      if (!sparkContext.version.startsWith("2.1")) {
+        val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
+        DataMapStoreManager.getInstance()
+          .clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/carbon_table"))
+        assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
+      }
+      assert(df.schema.map(_.name) === Seq("c1", "c2", "number"))
+      sql("ALTER TABLE carbon_table drop COLUMNS (a1 INT, b1 STRING) ")
+      assert(false)
+    } catch {
+      case e: Exception =>
+        assert(e.getMessage.contains("mismatched input 'COLUMNS' expecting"))
+    } finally {
+      sql("DROP TABLE IF EXISTS test_parquet")
+      sql("DROP TABLE IF EXISTS carbon_table")
+    }
+  }
+
+  test("test rename table name for table of using carbon") {
+    import spark.implicits._
+    import spark._
+    try {
+      val df = spark.sparkContext.parallelize(1 to 10)
+        .map(x => ("a" + x % 10, "b", x))
+        .toDF("c1", "c2", "number")
+      sql("DROP TABLE IF EXISTS test_parquet")
+      sql("DROP TABLE IF EXISTS carbon_table")
+      sql("DROP TABLE IF EXISTS carbon_table2")
+      // Saves dataFrame to carbon file
+      df.write
+        .format("parquet").saveAsTable("test_parquet")
+      sql("CREATE TABLE carbon_table(c1 STRING, c2 STRING, number INT) USING carbon")
+      sql("INSERT INTO carbon_table SELECT * FROM test_parquet")
+      TestUtil.checkAnswer(sql("SELECT * FROM carbon_table WHERE c1='a1'"),
+        sql("SELECT * FROM test_parquet WHERE c1='a1'"))
+      if (!sparkContext.version.startsWith("2.1")) {
+        val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
+        DataMapStoreManager.getInstance()
+          .clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/carbon_table"))
+        assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
+      }
+      assert(df.schema.map(_.name) === Seq("c1", "c2", "number"))
+      sql("ALTER TABLE carbon_table RENAME TO carbon_table2 ")
+      checkAnswer(sql("SELECT COUNT(*) FROM carbon_table2"), Seq(Row(10)));
+    } catch {
+      case e: Exception =>
+        e.printStackTrace()
+        assert(false)
+    } finally {
+      sql("DROP TABLE IF EXISTS test_parquet")
+      sql("DROP TABLE IF EXISTS carbon_table")
+    }
+  }
+
+  test("test change data type for table of using carbon") {
+    //TODO: Limit from spark
+    import spark.implicits._
+    import spark._
+    try {
+      val df = spark.sparkContext.parallelize(1 to 10)
+        .map(x => ("a" + x % 10, "b", x))
+        .toDF("c1", "c2", "number")
+      sql("DROP TABLE IF EXISTS test_parquet")
+      sql("DROP TABLE IF EXISTS carbon_table")
+      sql("DROP TABLE IF EXISTS carbon_table2")
+      // Saves dataFrame to carbon file
+      df.write
+        .format("parquet").saveAsTable("test_parquet")
+      sql("CREATE TABLE carbon_table(c1 STRING, c2 STRING, number decimal(8,2)) USING carbon")
+      sql("INSERT INTO carbon_table SELECT * FROM test_parquet")
+      TestUtil.checkAnswer(sql("SELECT * FROM carbon_table WHERE c1='a1'"),
+        sql("SELECT * FROM test_parquet WHERE c1='a1'"))
+      if (!sparkContext.version.startsWith("2.1")) {
+        val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
+        DataMapStoreManager.getInstance()
+          .clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/carbon_table"))
+        assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
+      }
+      assert(df.schema.map(_.name) === Seq("c1", "c2", "number"))
+      sql("ALTER TABLE carbon_table change number number decimal(9,4)")
+      assert(false)
+    } catch {
+      case e: Exception =>
+        assert(e.getMessage.contains("ALTER TABLE CHANGE COLUMN is not supported for changing column"))
+    } finally {
+      sql("DROP TABLE IF EXISTS test_parquet")
+      sql("DROP TABLE IF EXISTS carbon_table")
+    }
+  }
+
+  test("test add columns for table of using parquet") {
+    import spark.implicits._
+    val df = spark.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, "b", x))
+      .toDF("c1", "c2", "number")
+    import spark._
+    try {
+      sql("DROP TABLE IF EXISTS test_parquet")
+      sql("DROP TABLE IF EXISTS test_parquet2")
+      df.write
+        .format("parquet").saveAsTable("test_parquet")
+      sql("ALTER TABLE test_parquet ADD COLUMNS(a1 INT, b1 STRING) ")
+      sql("INSERT INTO test_parquet VALUES('Bob','xu',12,1,'parquet')")
+      TestUtil.checkAnswer(sql("SELECT COUNT(*) FROM test_parquet"), Seq(Row(11)))
+
+      sql("DROP TABLE IF EXISTS test_parquet2")
+      sql("CREATE TABLE test_parquet2(c1 STRING, c2 STRING, number INT) USING parquet")
+      sql("INSERT INTO test_parquet2 VALUES('Bob','xu',12)")
+      sql("ALTER TABLE test_parquet2 ADD COLUMNS (a1 INT, b1 STRING) ")
+      sql("INSERT INTO test_parquet2 VALUES('Bob','xu',12,1,'parquet')")
+      TestUtil.checkAnswer(sql("SELECT COUNT(*) FROM test_parquet2"), Seq(Row(2)))
+    } catch {
+      case e: Exception =>
+        e.printStackTrace()
+        assert(false)
+    } finally {
+      sql("DROP TABLE IF EXISTS test_parquet")
+      sql("DROP TABLE IF EXISTS test_parquet2")
+    }
+  }
+
+  test("test drop columns for table of using parquet") {
+    import spark.implicits._
+    val df = spark.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, "b", x))
+      .toDF("c1", "c2", "number")
+    import spark._
+
+    sql("DROP TABLE IF EXISTS test_parquet")
+    sql("DROP TABLE IF EXISTS test_parquet2")
+    df.write
+      .format("parquet").saveAsTable("test_parquet")
+
+    val df2 = df.drop("c1")
+
+    assert(df.schema.map(_.name) === Seq("c1", "c2", "number"))
+    assert(df2.schema.map(_.name) === Seq("c2", "number"))
+
+    try {
+      sql("ALTER TABLE test_parquet DROP COLUMNS(c1)")
+      assert(false)
+    } catch {
+      case e: Exception =>
+        assert(e.getMessage.contains("mismatched input 'COLUMNS' expecting"))
+    } finally {
+      sql("DROP TABLE IF EXISTS test_parquet")
+    }
+
+    sql("DROP TABLE IF EXISTS test_parquet2")
+    sql("CREATE TABLE test_parquet2(c1 STRING, c2 STRING, number INT) USING parquet")
+    sql("INSERT INTO test_parquet2 VALUES('Bob','xu',12)")
+    try {
+      sql("ALTER TABLE test_parquet2 DROP COLUMNS (a1 INT, b1 STRING) ")
+      assert(false)
+    } catch {
+      case e: Exception =>
+        assert(e.getMessage.contains("mismatched input 'COLUMNS' expecting"))
+    } finally {
+      sql("DROP TABLE IF EXISTS test_parquet2")
+    }
+  }
+
+  test("test rename table name for table of using parquet") {
+    import spark.implicits._
+    val df = spark.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, "b", x))
+      .toDF("c1", "c2", "number")
+    import spark._
+
+    sql("DROP TABLE IF EXISTS test_parquet")
+    sql("DROP TABLE IF EXISTS test_parquet2")
+    sql("DROP TABLE IF EXISTS test_parquet3")
+    sql("DROP TABLE IF EXISTS test_parquet22")
+    df.write
+      .format("parquet").saveAsTable("test_parquet")
+
+    try {
+      sql("ALTER TABLE test_parquet rename to test_parquet3")
+      checkAnswer(sql("SELECT COUNT(*) FROM test_parquet3"), Seq(Row(10)));
+    } catch {
+      case e: Exception =>
+        e.printStackTrace()
+        assert(false)
+    } finally {
+      sql("DROP TABLE IF EXISTS test_parquet")
+      sql("DROP TABLE IF EXISTS test_parquet3")
+    }
+
+    sql("DROP TABLE IF EXISTS test_parquet2")
+    sql("CREATE TABLE test_parquet2(c1 STRING, c2 STRING, number INT) USING parquet")
+    sql("INSERT INTO test_parquet2 VALUES('Bob','xu',12)")
+    try {
+      sql("ALTER TABLE test_parquet2 rename to test_parquet22")
+      checkAnswer(sql("SELECT COUNT(*) FROM test_parquet22"), Seq(Row(1)));
+    } catch {
+      case e: Exception =>
+        e.printStackTrace()
+        assert(false)
+    } finally {
+      sql("DROP TABLE IF EXISTS test_parquet2")
+      sql("DROP TABLE IF EXISTS test_parquet22")
+    }
+  }
+
+  test("test change data type for table of using parquet") {
+    import spark.implicits._
+    val df = spark.sparkContext.parallelize(1 to 10)
+      .map(x => ("a" + x % 10, "b", x))
+      .toDF("c1", "c2", "number")
+    import spark._
+
+    sql("DROP TABLE IF EXISTS test_parquet")
+    sql("DROP TABLE IF EXISTS test_parquet2")
+    df.write
+      .format("parquet").saveAsTable("test_parquet")
+    try {
+      sql("ALTER TABLE test_parquet CHANGE number number long")
+      assert(false)
+    } catch {
+      case e: Exception =>
+        assert(e.getMessage.contains("ALTER TABLE CHANGE COLUMN is not supported for changing column"))
+    } finally {
+      sql("DROP TABLE IF EXISTS test_parquet")
+    }
+    sql("DROP TABLE IF EXISTS test_parquet2")
+    sql("CREATE TABLE test_parquet2(c1 STRING, c2 STRING, number INT) USING parquet")
+    sql("INSERT INTO test_parquet2 VALUES('Bob','xu',12)")
+    try {
+      sql("ALTER TABLE test_parquet2 CHANGE number number long")
+      assert(false)
+    } catch {
+      case e: Exception =>
+        assert(e.getMessage.contains("ALTER TABLE CHANGE COLUMN is not supported for changing column"))
+    } finally {
+      sql("DROP TABLE IF EXISTS test_parquet2")
+    }
+  }
+
   test("test read with df write") {
     FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
     import spark.implicits._