[CARBONDATA-3189] Fix PreAggregate Datamap Issue
authorShubh18s <singh18shubhdeep@gmail.com>
Thu, 20 Dec 2018 11:17:32 +0000 (16:47 +0530)
committerkumarvishal09 <kumarvishal1802@gmail.com>
Mon, 7 Jan 2019 08:43:37 +0000 (14:13 +0530)
Problem -
Load and Select query was failing on table with preaggregate datamap.

Cause -
Previously if query on datamap was not enabled in thread, there was no check afterwards.

Solution -
After checking whether thread param for Direct Query On Datamap is enable. If not enable, we check in session params and then global.

This closes #3010

core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
docs/configuration-parameters.md
integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala

index 8d0a4d9..c1ef940 100644 (file)
@@ -1450,12 +1450,6 @@ public final class CarbonCommonConstants {
   public static final String SUPPORT_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE = "false";
 
   @CarbonProperty
-  public static final String VALIDATE_DIRECT_QUERY_ON_DATAMAP =
-      "carbon.query.validate.direct.query.on.datamap";
-
-  public static final String VALIDATE_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE = "true";
-
-  @CarbonProperty
   public static final String CARBON_SHOW_DATAMAPS = "carbon.query.show.datamaps";
 
   public static final String CARBON_SHOW_DATAMAPS_DEFAULT = "true";
index db21c6a..105b768 100644 (file)
@@ -135,7 +135,6 @@ This section provides the details of all the configurations required for the Car
 | carbon.custom.block.distribution | false | CarbonData has its own scheduling algorithm to suggest to Spark on how many tasks needs to be launched and how much work each task need to do in a Spark cluster for any query on CarbonData. When this configuration is true, CarbonData would distribute the available blocks to be scanned among the available number of cores. For Example:If there are 10 blocks to be scanned and only 3 tasks can be run(only 3 executor cores available in the cluster), CarbonData would combine blocks as 4,3,3 and give it to 3 tasks to run. **NOTE:** When this configuration is false, as per the ***carbon.task.distribution*** configuration, each block/blocklet would be given to each task. |
 | enable.query.statistics | false | CarbonData has extensive logging which would be useful for debugging issues related to performance or hard to locate issues. This configuration when made ***true*** would log additional query statistics information to more accurately locate the issues being debugged.**NOTE:** Enabling this would log more debug information to log files, there by increasing the log files size significantly in short span of time. It is advised to configure the log files size, retention of log files parameters in log4j properties appropriately. Also extensive logging is an increased IO operation and hence over all query performance might get reduced. Therefore it is recommended to enable this configuration only for the duration of debugging. |
 | enable.unsafe.in.query.processing | false | CarbonData supports unsafe operations of Java to avoid GC overhead for certain operations. This configuration enables to use unsafe functions in CarbonData while scanning the  data during query. |
-| carbon.query.validate.direct.query.on.datamap | true | CarbonData supports creating pre-aggregate table datamaps as an independent tables. For some debugging purposes, it might be required to directly query from such datamap tables. This configuration allows to query on such datamaps. |
 | carbon.max.driver.threads.for.block.pruning | 4 | Number of threads used for driver pruning when the carbon files are more than 100k Maximum memory. This configuration can used to set number of threads between 1 to 4. |
 | carbon.heap.memory.pooling.threshold.bytes | 1048576 | CarbonData supports unsafe operations of Java to avoid GC overhead for certain operations. Using unsafe, memory can be allocated on Java Heap or off heap. This configuration controls the allocation mechanism on Java HEAP. If the heap memory allocations of the given size is greater or equal than this value,it should go through the pooling mechanism. But if set this size to -1, it should not go through the pooling mechanism. Default value is 1048576(1MB, the same as Spark). Value to be specified in bytes. |
 | carbon.push.rowfilters.for.vector | false | When enabled complete row filters will be handled by carbon in case of vector. If it is disabled then only page level pruning will be done by carbon and row level filtering will be done by spark for vector. And also there are scan optimizations in carbon to avoid multiple data copies when this parameter is set to false. There is no change in flow for non-vector based queries. |
@@ -211,7 +210,7 @@ RESET
 | carbon.options.sort.scope                 | Specifies how the current data load should be sorted with. **NOTE:** Refer to [Data Loading Configuration](#data-loading-configuration)#carbon.sort.scope for detailed information. |
 | carbon.options.global.sort.partitions     |                                                              |
 | carbon.options.serialization.null.format  | Default Null value representation in the data being loaded. **NOTE:** Refer to [Data Loading Configuration](#data-loading-configuration)#carbon.options.serialization.null.format for detailed information. |
-| carbon.query.directQueryOnDataMap.enabled | Specifies whether datamap can be queried directly. This is useful for debugging purposes.**NOTE: **Refer to [Query Configuration](#query-configuration)#carbon.query.validate.direct.query.on.datamap for detailed information. |
+| carbon.query.directQueryOnDataMap.enabled | Specifies whether datamap can be queried directly. This is useful for debugging purposes.**NOTE: **Refer to [Query Configuration](#query-configuration) for detailed information. |
 
 **Examples:**
 
index d70e179..4149f6e 100644 (file)
@@ -483,48 +483,6 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
     executorService.shutdown()
   }
 
-  test("support set carbon.query.directQueryOnDataMap.enabled=true") {
-    val rootPath = new File(this.getClass.getResource("/").getPath
-      + "../../../..").getCanonicalPath
-    val testData = s"$rootPath/integration/spark-common-test/src/test/resources/sample.csv"
-    sql("drop table if exists mainTable")
-    sql(
-      s"""
-         | CREATE TABLE mainTable
-         |   (id Int,
-         |   name String,
-         |   city String,
-         |   age Int)
-         | STORED BY 'org.apache.carbondata.format'
-      """.stripMargin)
-
-    sql(
-      s"""
-         | LOAD DATA LOCAL INPATH '$testData'
-         | into table mainTable
-       """.stripMargin)
-
-    sql(
-      s"""
-         | create datamap preagg_sum on table mainTable
-         | using 'preaggregate'
-         | as select id,sum(age) from mainTable group by id
-       """.stripMargin)
-
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP, "true")
-
-    sql("set carbon.query.directQueryOnDataMap.enabled=true")
-    checkAnswer(sql("select count(*) from maintable_preagg_sum"), Row(4))
-    sql("set carbon.query.directQueryOnDataMap.enabled=false")
-    val exception: Exception = intercept[AnalysisException] {
-      sql("select count(*) from maintable_preagg_sum").collect()
-    }
-    assert(exception.getMessage.contains("Query On DataMap not supported"))
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP, "false")
-  }
-
   class QueryTask(query: String) extends Callable[String] {
     override def call(): String = {
       var result = "SUCCESS"
index 411d5a3..e018536 100644 (file)
@@ -42,7 +42,7 @@ class QueryTest extends PlanTest {
   Locale.setDefault(Locale.US)
 
   CarbonProperties.getInstance()
-    .addProperty(CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP, "false")
+    .addProperty(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP, "true")
 
   /**
    * Runs the plan and makes sure the answer contains all of the keywords, or the
index 319f84b..4e5b764 100644 (file)
@@ -631,12 +631,14 @@ object PreAggregateUtil {
           case _ => a.getAggFunction}}(${a.getColumnName})"
       } else {
         groupingExpressions += a.getColumnName
-        aggregateColumns+= a.getColumnName
+        aggregateColumns += a.getColumnName
       }
     }
+    val groupByString = if (groupingExpressions.nonEmpty) {
+      s" group by ${ groupingExpressions.mkString(",") }"
+    } else { "" }
     s"select ${ aggregateColumns.mkString(",") } " +
-    s"from $databaseName.${ tableSchema.getTableName }" +
-    s" group by ${ groupingExpressions.mkString(",") }"
+    s"from $databaseName.${ tableSchema.getTableName }" + groupByString
   }
 
   /**
index 0f350b9..3986839 100644 (file)
@@ -106,22 +106,27 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
   def validateQueryDirectlyOnDataMap(relations: Seq[CarbonDecoderRelation]): Unit = {
     var isPreAggDataMapExists = false
     // first check if pre aggregate data map exists or not
-    relations.foreach{relation =>
+    relations.foreach { relation =>
       if (relation.carbonRelation.carbonTable.isChildDataMap) {
         isPreAggDataMapExists = true
       }
     }
-    val validateQuery = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP,
-        CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE).toBoolean
     var isThrowException = false
-    // if validate query is enabled and relation contains pre aggregate data map
-    if (validateQuery && isPreAggDataMapExists) {
+    // if relation contains pre aggregate data map
+    if (isPreAggDataMapExists) {
       val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
       if (null != carbonSessionInfo) {
-        val supportQueryOnDataMap = CarbonProperties.getInstance.getProperty(
-          CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP,
-            CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE).toBoolean
+        lazy val sessionPropertyValue = CarbonProperties.getInstance
+          .getProperty(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP,
+            CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE)
+        // Check if property is set in thread params which would mean this is an internal call
+        // (from load or compaction) and should be of highest priority. Otherwise get from
+        // session(like user has dynamically given the value using set command). If not found in
+        // session then look for the property in carbon.properties file, otherwise use default
+        // value 'false'.
+        val supportQueryOnDataMap = CarbonEnv
+          .getThreadParam(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP,
+            sessionPropertyValue).toBoolean
         if (!supportQueryOnDataMap) {
           isThrowException = true
         }