[CARBONDATA-3201] Added load level SORT_SCOPE
authornamanrastogi <naman.rastogi.52@gmail.com>
Fri, 21 Dec 2018 07:33:30 +0000 (13:03 +0530)
committerkumarvishal09 <kumarvishal1802@gmail.com>
Wed, 9 Jan 2019 08:36:17 +0000 (14:06 +0530)
Added SORT_SCOPE in Load Options & in SET Command

1. Added load level SORT_SCOPE
2. Added Sort_Scope for PreAgg
3. Added sort_scope msg for LoadDataCommand
4. Added property CARBON.TABLE.LOAD.SORT.SCOPE.<database>.<table> to set table level sort_scope property
5. Removed test case veryfying LOAD_OPTIONS with SORT_SCOPE

Load level SORT_SCOPE
LOAD DATA INPATH 'path/to/data.csv'
INTO TABLE my_table
OPTIONS (
   'sort_scope'='no_sort'
)
Priority of SORT_SCOPE
Load Level (if provided)
Table level (if provided)
Default

This closes #3014

13 files changed:
core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithSortScope.scala
integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
integration/spark2/src/test/scala/org/apache/spark/carbondata/commands/SetCommandTestCase.scala
processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java

index 5cf6163..eef2bef 100644 (file)
@@ -81,6 +81,12 @@ public final class CarbonLoadOptionConstants {
       "carbon.options.sort.scope";
 
   /**
+   * option to specify table level sort_scope
+   */
+  @CarbonProperty(dynamicConfigurable = true)
+  public static final String CARBON_TABLE_LOAD_SORT_SCOPE = "carbon.table.load.sort.scope.";
+
+  /**
    * option to specify the batch sort size inmb
    */
   @CarbonProperty(dynamicConfigurable = true)
index f49747f..d9aa214 100644 (file)
@@ -161,7 +161,7 @@ public class SessionParams implements Serializable, Cloneable {
         isValid = CarbonUtil.isValidSortOption(value);
         if (!isValid) {
           throw new InvalidConfigurationException("The sort scope " + key
-              + " can have only either BATCH_SORT or LOCAL_SORT or NO_SORT.");
+              + " can have only either NO_SORT, BATCH_SORT, LOCAL_SORT or GLOBAL_SORT.");
         }
         break;
       case CARBON_OPTIONS_BATCH_SORT_SIZE_INMB:
@@ -229,6 +229,12 @@ public class SessionParams implements Serializable, Cloneable {
           if (!isValid) {
             throw new InvalidConfigurationException("Invalid value " + value + " for key " + key);
           }
+        } else if (key.startsWith(CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE)) {
+          isValid = CarbonUtil.isValidSortOption(value);
+          if (!isValid) {
+            throw new InvalidConfigurationException("The sort scope " + key
+                + " can have only either NO_SORT, BATCH_SORT, LOCAL_SORT or GLOBAL_SORT.");
+          }
         } else {
           throw new InvalidConfigurationException(
               "The key " + key + " not supported for dynamic configuration.");
index 890475d..dac9124 100644 (file)
@@ -32,25 +32,6 @@ class TestCreateTableWithSortScope extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS tableWithBatchSort")
     sql("DROP TABLE IF EXISTS tableWithNoSort")
     sql("DROP TABLE IF EXISTS tableWithUnsupportSortScope")
-    sql("DROP TABLE IF EXISTS tableLoadWithSortScope")
-  }
-
-  test("Do not support load data with specify sort scope") {
-    sql(
-    s"""
-       | CREATE TABLE tableLoadWithSortScope(
-       | intField INT,
-       | stringField STRING
-       | )
-       | STORED BY 'carbondata'
-       | TBLPROPERTIES('SORT_COLUMNS'='stringField')
-       """.stripMargin)
-
-    val exception_loaddata_sortscope: Exception = intercept[Exception] {
-      sql("LOAD DATA LOCAL INPATH '/path/to/data' INTO TABLE tableLoadWithSortScope " +
-          "OPTIONS('SORT_SCOPE'='GLOBAL_SORT')")
-    }
-    assert(exception_loaddata_sortscope.getMessage.contains("Error: Invalid option(s): sort_scope"))
   }
 
   test("test create table with sort scope in normal cases") {
index b382693..2138580 100644 (file)
@@ -109,7 +109,7 @@ object StreamSinkFactory {
     carbonLoadModel.setSegmentId(segmentId)
 
     // Used to generate load commands for child tables in case auto-handoff is fired.
-    val loadMetaEvent = new LoadMetadataEvent(carbonTable, false)
+    val loadMetaEvent = new LoadMetadataEvent(carbonTable, false, parameters.asJava)
     OperationListenerBus.getInstance().fireEvent(loadMetaEvent, operationContext)
     // start server if necessary
     val server = startDictionaryServer(
index 4b14879..a669931 100644 (file)
@@ -1105,7 +1105,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
       "SORT_COLUMN_BOUNDS",
       "LOAD_MIN_SIZE_INMB",
       "RANGE_COLUMN",
-      "SCALE_FACTOR"
+      "SCALE_FACTOR",
+      "SORT_SCOPE"
     )
     var isSupported = true
     val invalidOptions = StringBuilder.newBuilder
index 6defb0a..419fa16 100644 (file)
@@ -95,9 +95,9 @@ case class CarbonAlterTableCompactionCommand(
       // If set to true then only loadCommands for compaction will be created.
       val loadMetadataEvent =
         if (alterTableModel.compactionType.equalsIgnoreCase(CompactionType.STREAMING.name())) {
-          new LoadMetadataEvent(table, false)
+          new LoadMetadataEvent(table, false, Map.empty[String, String].asJava)
         } else {
-          new LoadMetadataEvent(table, true)
+          new LoadMetadataEvent(table, true, Map.empty[String, String].asJava)
         }
       OperationListenerBus.getInstance().fireEvent(loadMetadataEvent, operationContext)
     }
index 67172af..95c0767 100644 (file)
@@ -142,7 +142,7 @@ case class CarbonLoadDataCommand(
     }
     operationContext.setProperty("isOverwrite", isOverwriteTable)
     if(CarbonUtil.hasAggregationDataMap(table)) {
-      val loadMetadataEvent = new LoadMetadataEvent(table, false)
+      val loadMetadataEvent = new LoadMetadataEvent(table, false, options.asJava)
       OperationListenerBus.getInstance().fireEvent(loadMetadataEvent, operationContext)
     }
     Seq.empty
@@ -191,10 +191,34 @@ case class CarbonLoadDataCommand(
     optionsFinal
       .put("complex_delimiter_level_4",
         ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_4.value())
-    optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
-      carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
-        carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
-          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
+
+    /**
+    * Priority of sort_scope assignment :
+    * -----------------------------------
+    *
+    * 1. Load Options  ->
+    *     LOAD DATA INPATH 'data.csv' INTO TABLE tableName OPTIONS('sort_scope'='no_sort')
+    *
+    * 2. Session property CARBON_TABLE_LOAD_SORT_SCOPE  ->
+    *     SET CARBON.TABLE.LOAD.SORT.SCOPE.database.table=no_sort
+    *     SET CARBON.TABLE.LOAD.SORT.SCOPE.database.table=batch_sort
+    *     SET CARBON.TABLE.LOAD.SORT.SCOPE.database.table=local_sort
+    *     SET CARBON.TABLE.LOAD.SORT.SCOPE.database.table=global_sort
+    *
+    * 3. Sort Scope provided in TBLPROPERTIES
+    * 4. Session property CARBON_OPTIONS_SORT_SCOPE
+    * 5. Default Sort Scope LOAD_SORT_SCOPE
+    */
+    optionsFinal.put("sort_scope",
+      options.getOrElse("sort_scope",
+        carbonProperty.getProperty(
+          CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE + table.getDatabaseName + "." +
+          table.getTableName,
+          tableProperties.asScala.getOrElse("sort_scope",
+            carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+              carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+                CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))))
+
       optionsFinal
         .put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(options.asJava, table))
       val factPath = if (dataFrame.isDefined) {
@@ -304,6 +328,7 @@ case class CarbonLoadDataCommand(
       }
       val partitionStatus = SegmentStatus.SUCCESS
       val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
+      LOGGER.info("Sort Scope : " + carbonLoadModel.getSortScope)
       if (carbonLoadModel.getUseOnePass) {
         loadDataUsingOnePass(
           sparkSession,
index eb98264..4a0b492 100644 (file)
@@ -349,7 +349,8 @@ object CompactionProcessMetaListener extends OperationEventListener {
           TableIdentifier(childTableName, Some(childDatabaseName)),
           childDataFrame,
           false,
-          sparkSession)
+          sparkSession,
+          mutable.Map.empty[String, String])
         val uuid = Option(operationContext.getProperty("uuid")).
           getOrElse(UUID.randomUUID()).toString
         operationContext.setProperty("uuid", uuid)
@@ -377,7 +378,8 @@ object CompactionProcessMetaListener extends OperationEventListener {
         TableIdentifier(childTableName, Some(childDatabaseName)),
         childDataFrame,
         false,
-        sparkSession)
+        sparkSession,
+        mutable.Map.empty[String, String])
       val uuid = Option(operationContext.getProperty("uuid")).getOrElse(UUID.randomUUID()).toString
       loadCommand.processMetadata(sparkSession)
       operationContext.setProperty(table.getTableName + "_Compaction", loadCommand)
@@ -453,6 +455,7 @@ object LoadProcessMetaListener extends OperationEventListener {
             childDataFrame,
             isOverwrite,
             sparkSession,
+            tableEvent.getOptions.asScala,
             timeseriesParentTableName = childSelectQuery._2)
           operationContext.setProperty("uuid", uuid)
           loadCommand.operationContext.setProperty("uuid", uuid)
index 4116ed6..b729347 100644 (file)
@@ -275,7 +275,8 @@ case class PreAggregateTableHelper(
       tableIdentifier,
       dataFrame,
       isOverwrite = false,
-      sparkSession = sparkSession)
+      sparkSession = sparkSession,
+      mutable.Map.empty[String, String])
     loadCommand.processMetadata(sparkSession)
     Seq.empty
   }
index 4e5b764..0314dd8 100644 (file)
@@ -17,6 +17,7 @@
 package org.apache.spark.sql.execution.command.preaaggregate
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 
 import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, CarbonSession, SparkSession, _}
@@ -895,6 +896,7 @@ object PreAggregateUtil {
       dataFrame: DataFrame,
       isOverwrite: Boolean,
       sparkSession: SparkSession,
+      options: mutable.Map[String, String],
       timeseriesParentTableName: String = ""): CarbonLoadDataCommand = {
     val headers = columns.asScala.filter { column =>
       !column.getColumnName.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)
index 9f97828..5cc5bc8 100644 (file)
@@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonCommonConstantsInternal}
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonCommonConstantsInternal, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, SessionParams}
 
 case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
@@ -84,10 +84,7 @@ case class CarbonSetCommand(command: SetCommand)
 object CarbonSetCommand {
   def validateAndSetValue(sessionParams: SessionParams, key: String, value: String): Unit = {
     val isCarbonProperty: Boolean = CarbonProperties.getInstance().isCarbonProperty(key)
-    if (isCarbonProperty) {
-      sessionParams.addProperty(key, value)
-    }
-    else if (key.startsWith(CarbonCommonConstants.CARBON_INPUT_SEGMENTS)) {
+    if (key.startsWith(CarbonCommonConstants.CARBON_INPUT_SEGMENTS)) {
       if (key.split("\\.").length == 5) {
         sessionParams.addProperty(key.toLowerCase(), value)
       }
@@ -117,6 +114,18 @@ object CarbonSetCommand {
           "property should be in \" carbon.load.datamaps.parallel.<database_name>" +
           ".<table_name>=<true/false> \" format.")
       }
+    } else if (key.startsWith(CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE)) {
+      if (key.split("\\.").length == 7) {
+        sessionParams.addProperty(key.toLowerCase(), value)
+      }
+      else {
+        throw new MalformedCarbonCommandException(
+          "property should be in \" carbon.table.load.sort.scope.<database_name>" +
+          ".<table_name>=<sort_sope> \" format.")
+      }
+    }
+    else if (isCarbonProperty) {
+      sessionParams.addProperty(key, value)
     }
   }
 
index 1610d8d..8b03630 100644 (file)
@@ -128,6 +128,34 @@ class SetCommandTestCase extends Spark2QueryTest with BeforeAndAfterAll{
         sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS}"))
     }
   }
+
+  test(s"test set carbon.table.load.sort.scope for valid options") {
+    checkAnswer(
+      sql(s"set carbon.table.load.sort.scope.db.tbl=no_sort"),
+      sql(s"set carbon.table.load.sort.scope.db.tbl"))
+
+    checkAnswer(
+      sql(s"set carbon.table.load.sort.scope.db.tbl=batch_sort"),
+      sql(s"set carbon.table.load.sort.scope.db.tbl"))
+
+    checkAnswer(
+      sql(s"set carbon.table.load.sort.scope.db.tbl=local_sort"),
+      sql(s"set carbon.table.load.sort.scope.db.tbl"))
+
+    checkAnswer(
+      sql(s"set carbon.table.load.sort.scope.db.tbl=global_sort"),
+      sql(s"set carbon.table.load.sort.scope.db.tbl"))
+  }
+
+  test(s"test set carbon.table.load.sort.scope for invalid options")
+  {
+    intercept[InvalidConfigurationException] {
+      checkAnswer(
+        sql(s"set carbon.table.load.sort.scope.db.tbl=fake_sort"),
+        sql(s"set carbon.table.load.sort.scope.db.tbl"))
+    }
+  }
+
   override def afterAll {
     sqlContext.sparkSession.catalog.clearCache()
     sql("reset")
index 1e53817..c55af83 100644 (file)
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.processing.loading.events;
 
+import java.util.Map;
+
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.events.Event;
@@ -101,9 +103,13 @@ public class LoadEvents {
   public static class LoadMetadataEvent extends Event {
     private CarbonTable carbonTable;
     private boolean isCompaction;
-    public LoadMetadataEvent(CarbonTable carbonTable, boolean isCompaction) {
+    private Map<String, String> options;
+
+    public LoadMetadataEvent(CarbonTable carbonTable, boolean isCompaction,
+        Map<String, String> options) {
       this.carbonTable = carbonTable;
       this.isCompaction = isCompaction;
+      this.options = options;
     }
     public boolean isCompaction() {
       return isCompaction;
@@ -111,6 +117,11 @@ public class LoadEvents {
     public CarbonTable getCarbonTable() {
       return carbonTable;
     }
+
+
+    public Map<String, String> getOptions() {
+      return options;
+    }
   }
 
   public static class LoadTablePostStatusUpdateEvent extends Event {