[CARBONDATA-3222]Fix dataload failure after creation of preaggregate datamap
authorshardul-cr7 <shardulsingh22@gmail.com>
Wed, 2 Jan 2019 09:17:34 +0000 (14:47 +0530)
committerkunal642 <kunalkapoor642@gmail.com>
Mon, 7 Jan 2019 12:25:46 +0000 (17:55 +0530)
on main table with long_string_columns

Dataload is gettling failed because child table properties are not getting
modified according to the parent table for long_string_columns.
This occurs only when long_string_columns is not specified in dmproperties
for preaggregate datamap but the datamap was getting created and data load
was failing. This PR is to avoid the dataload failure in this scenario.

This closes #3045

integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala

index c84ee92..ff202b6 100644 (file)
@@ -333,6 +333,37 @@ class VarcharDataTypesBasicTestCase extends QueryTest with BeforeAndAfterEach wi
     sql(s"DROP DATAMAP IF EXISTS $datamapName ON TABLE $longStringTable")
   }
 
+  test("creating datamap with long string column selected and loading data should be success") {
+
+    sql(s"drop table if exists $longStringTable")
+    val datamapName = "pre_agg_dm"
+    sql(
+      s"""
+         | CREATE TABLE if not exists $longStringTable(
+         | id INT, name STRING, description STRING, address STRING, note STRING
+         | ) STORED BY 'carbondata'
+         | TBLPROPERTIES('LONG_STRING_COLUMNS'='description, note', 'SORT_COLUMNS'='name')
+         |""".stripMargin)
+
+    sql(
+      s"""
+         | CREATE DATAMAP $datamapName ON TABLE $longStringTable
+         | USING 'preaggregate'
+         | AS SELECT id,description,note,count(*) FROM $longStringTable
+         | GROUP BY id,description,note
+         |""".
+        stripMargin)
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $longStringTable
+         | OPTIONS('header'='false')
+       """.stripMargin)
+
+    checkAnswer(sql(s"select count(*) from $longStringTable"), Row(1000))
+    sql(s"drop table if exists $longStringTable")
+  }
+
   test("create table with varchar column and complex column") {
     sql("DROP TABLE IF EXISTS varchar_complex_table")
     sql("""
index bc03be2..4116ed6 100644 (file)
@@ -110,7 +110,29 @@ case class PreAggregateTableHelper(
     // Datamap table name and columns are automatically added prefix with parent table name
     // in carbon. For convenient, users can type column names same as the ones in select statement
     // when config dmproperties, and here we update column names with prefix.
-    val longStringColumn = tableProperties.get(CarbonCommonConstants.LONG_STRING_COLUMNS)
+    // If longStringColumn is not present in dm properties then we take long_string_columns from
+    // the parent table.
+    var longStringColumn = tableProperties.get(CarbonCommonConstants.LONG_STRING_COLUMNS)
+    if (longStringColumn.isEmpty) {
+      val longStringColumnInParents = parentTable.getTableInfo.getFactTable.getTableProperties
+        .asScala
+        .getOrElse(CarbonCommonConstants.LONG_STRING_COLUMNS, "").split(",").map(_.trim)
+      val varcharDatamapFields = scala.collection.mutable.ArrayBuffer.empty[String]
+      fieldRelationMap foreach (fields => {
+        val aggFunc = fields._2.aggregateFunction
+        val relationList = fields._2.columnTableRelationList
+        // check if columns present in datamap are long_string_col in parent table. If they are
+        // long_string_columns in parent, make them long_string_columns in datamap
+        if (aggFunc.isEmpty && relationList.size == 1 && longStringColumnInParents
+          .contains(relationList.head.head.parentColumnName)) {
+          varcharDatamapFields += relationList.head.head.parentColumnName
+        }
+      })
+      if (!varcharDatamapFields.isEmpty) {
+        longStringColumn = Option(varcharDatamapFields.mkString(","))
+      }
+    }
+
     if (longStringColumn != None) {
       val fieldNames = fields.map(_.column)
       val newLongStringColumn = longStringColumn.get.split(",").map(_.trim).map{ colName =>