[CARBONDATA-3200] No-Sort compaction
authornamanrastogi <naman.rastogi.52@gmail.com>
Wed, 2 Jan 2019 10:56:09 +0000 (16:26 +0530)
committerkumarvishal09 <kumarvishal1802@gmail.com>
Wed, 9 Jan 2019 12:35:35 +0000 (18:05 +0530)
When the data is loaded with SORT_SCOPE as NO_SORT, and done compaction upon, the data still remains unsorted. This does not affect much in query.
The major purpose of compaction, is better pack the data and improve query performance.

Now, the expected behaviour of compaction is sort to the data, so that after compaction, query performance becomes better.
 The columns to sort upon are provided by SORT_COLUMNS.

The new compaction works as follows:

Do sorting on unsorted & restructured data and store in temporary files
Pick a row from those temporary files, and already sorted carbondata files, according to a comparator on sort_columns.
Write data to a new segment (similar to old compaction flow).
Repeat steps 2 & 3 until no more rows are left.

This closes #3029

22 files changed:
core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
core/src/main/java/org/apache/carbondata/core/metadata/blocklet/DataFileFooter.java
core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
format/src/main/thrift/carbondata_index.thrift
integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/compaction/TestHybridCompaction.scala [new file with mode: 0644]
integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/InMemorySortTempChunkHolder.java [new file with mode: 0644]
processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java

index c38124d..8ef2198 100644 (file)
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath.DataFileUtil;
@@ -101,6 +102,8 @@ public class TableBlockInfo implements Distributable, Serializable {
 
   private String dataMapWriterPath;
 
+  private transient DataFileFooter dataFileFooter;
+
   /**
    * comparator to sort by block size in descending order.
    * Since each line is not exactly the same, the size of a InputSplit may differs,
@@ -462,6 +465,14 @@ public class TableBlockInfo implements Distributable, Serializable {
     this.dataMapWriterPath = dataMapWriterPath;
   }
 
+  public DataFileFooter getDataFileFooter() {
+    return dataFileFooter;
+  }
+
+  public void setDataFileFooter(DataFileFooter dataFileFooter) {
+    this.dataFileFooter = dataFileFooter;
+  }
+
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder("TableBlockInfo{");
index 2d8b082..191056d 100644 (file)
@@ -165,6 +165,7 @@ public class BlockletDataMap extends BlockDataMap implements Serializable {
       int ordinal = 0;
       int taskMinMaxOrdinal = 0;
       BlockletInfo blockletInfo = blockletList.get(index);
+      blockletInfo.setSorted(fileFooter.isSorted());
       BlockletMinMaxIndex minMaxIndex = blockletInfo.getBlockletIndex().getMinMaxIndex();
       // get min max values for columns to be cached
       byte[][] minValuesForColumnsToBeCached = BlockletDataMapUtil
index 420cd4e..88706b1 100644 (file)
@@ -89,6 +89,8 @@ public class BlockletInfo implements Serializable, Writable {
 
   private int[] numberOfRowsPerPage;
 
+  private Boolean isSorted = true;
+
   /**
    * @return the numberOfRows
    */
@@ -222,6 +224,11 @@ public class BlockletInfo implements Serializable, Writable {
     }
     writeChunkInfoForOlderVersions(output);
 
+    boolean isSortedPresent = (isSorted != null);
+    output.writeBoolean(isSortedPresent);
+    if (isSortedPresent) {
+      output.writeBoolean(isSorted);
+    }
   }
 
   /**
@@ -288,6 +295,10 @@ public class BlockletInfo implements Serializable, Writable {
       measureChunksLength.add(input.readInt());
     }
     readChunkInfoForOlderVersions(input);
+    final boolean isSortedPresent = input.readBoolean();
+    if (isSortedPresent) {
+      this.isSorted = input.readBoolean();
+    }
   }
 
   /**
@@ -317,4 +328,12 @@ public class BlockletInfo implements Serializable, Writable {
   public void setNumberOfRowsPerPage(int[] numberOfRowsPerPage) {
     this.numberOfRowsPerPage = numberOfRowsPerPage;
   }
+
+  public Boolean isSorted() {
+    return isSorted;
+  }
+
+  public void setSorted(Boolean sorted) {
+    isSorted = sorted;
+  }
 }
index 1f45716..8a8652b 100644 (file)
@@ -75,6 +75,11 @@ public class DataFileFooter implements Serializable {
   private long schemaUpdatedTimeStamp;
 
   /**
+   * boolean representing if the file is sorted or not.
+   */
+  private Boolean isSorted = true;
+
+  /**
    * @return the versionId
    */
   public ColumnarFormatVersion getVersionId() {
@@ -179,4 +184,12 @@ public class DataFileFooter implements Serializable {
   public void setSchemaUpdatedTimeStamp(long schemaUpdatedTimeStamp) {
     this.schemaUpdatedTimeStamp = schemaUpdatedTimeStamp;
   }
+
+  public void setSorted(Boolean isSorted) {
+    this.isSorted = isSorted;
+  }
+
+  public Boolean isSorted() {
+    return isSorted;
+  }
 }
index 5fd4b1d..ab7c577 100644 (file)
@@ -212,6 +212,9 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
             .isUseMinMaxForPruning()) {
         blockInfo.setBlockOffset(blockletDetailInfo.getBlockFooterOffset());
         DataFileFooter fileFooter = filePathToFileFooterMapping.get(blockInfo.getFilePath());
+        if (null != blockInfo.getDataFileFooter()) {
+          fileFooter = blockInfo.getDataFileFooter();
+        }
         if (null == fileFooter) {
           blockInfo.setDetailInfo(null);
           fileFooter = CarbonUtil.readMetadataFile(blockInfo);
index 601ce50..64d30c2 100644 (file)
@@ -126,6 +126,13 @@ public abstract class AbstractDataFileFooterConverter {
           dataFileFooter.setNumberOfRows(readBlockIndexInfo.getNum_rows());
           dataFileFooter.setBlockInfo(new BlockInfo(tableBlockInfo));
           dataFileFooter.setSegmentInfo(segmentInfo);
+          if (readIndexHeader.isSetIs_sort()) {
+            dataFileFooter.setSorted(readIndexHeader.isIs_sort());
+          } else {
+            if (tableBlockInfo.getVersion() == ColumnarFormatVersion.V3) {
+              dataFileFooter.setSorted(null);
+            }
+          }
           dataFileFooters.add(dataFileFooter);
           if (++index == tableBlockInfoList.size()) {
             break;
index 438e3e3..d6d91ed 100644 (file)
@@ -73,6 +73,11 @@ public class DataFileFooterConverterV3 extends AbstractDataFileFooterConverter {
     dataFileFooter.setNumberOfRows(footer.getNum_rows());
     dataFileFooter.setSegmentInfo(getSegmentInfo(footer.getSegment_info()));
     dataFileFooter.setSchemaUpdatedTimeStamp(fileHeader.getTime_stamp());
+    if (footer.isSetIs_sort()) {
+      dataFileFooter.setSorted(footer.isIs_sort());
+    } else {
+      dataFileFooter.setSorted(null);
+    }
     List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
     List<org.apache.carbondata.format.ColumnSchema> table_columns = fileHeader.getColumn_schema();
     for (int i = 0; i < table_columns.size(); i++) {
index f77a256..e8b81f7 100644 (file)
@@ -32,6 +32,7 @@ struct IndexHeader{
   3: required carbondata.SegmentInfo segment_info;     // Segment info (will be same/repeated for all files in this segment)
   4: optional i32 bucket_id; // Bucket number in which file contains
   5: optional i64 schema_time_stamp; // Timestamp to compare column schema against master schema
+  6: optional bool is_sort; // True if the data is sorted in this file, it is used for compaction to decide whether to use merge sort or not
 }
 
 /**
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/compaction/TestHybridCompaction.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/compaction/TestHybridCompaction.scala
new file mode 100644 (file)
index 0000000..4c37ebd
--- /dev/null
@@ -0,0 +1,262 @@
+package org.apache.carbondata.spark.testsuite.compaction
+
+import java.io.{BufferedWriter, File, FileWriter}
+
+import scala.collection.mutable.ListBuffer
+
+import au.com.bytecode.opencsv.CSVWriter
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.test.util.QueryTest
+import org.junit.Assert
+import org.scalatest.Matchers._
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+
+class TestHybridCompaction extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
+
+  val rootPath = new File(this.getClass.getResource("/").getPath + "../../../..").getCanonicalPath
+
+  val csvPath1 =
+    s"$rootPath/integration/spark-common-test/src/test/resources/compaction/hybridCompaction1.csv"
+
+  val csvPath2 =
+    s"$rootPath/integration/spark-common-test/src/test/resources/compaction/hybridCompaction2.csv"
+
+  val tableName = "t1"
+
+
+  override def beforeAll: Unit = {
+    generateCSVFiles()
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "MM/dd/yyyy")
+  }
+
+
+  override def afterAll: Unit = {
+    deleteCSVFiles()
+  }
+
+
+  override def beforeEach(): Unit = {
+    dropTable()
+    createTable()
+  }
+
+
+  override def afterEach(): Unit = {
+        dropTable()
+  }
+
+
+  def generateCSVFiles(): Unit = {
+    val rows1 = new ListBuffer[Array[String]]
+    rows1 += Array("seq", "first", "last", "age", "city", "state", "date")
+    rows1 += Array("1", "Augusta", "Nichols", "20", "Varasdo", "WA", "07/05/2003")
+    rows1 += Array("2", "Luis", "Barnes", "39", "Oroaklim", "MT", "04/05/2048")
+    rows1 += Array("3", "Leah", "Guzman", "54", "Culeosa", "KS", "02/23/1983")
+    rows1 += Array("4", "Ian", "Ford", "61", "Rufado", "AL", "03/02/1995")
+    rows1 += Array("5", "Fanny", "Horton", "37", "Rorlihbem", "CT", "05/12/1987")
+    createCSV(rows1, csvPath1)
+
+    val rows2 = new ListBuffer[Array[String]]
+    rows2 += Array("seq", "first", "last", "age", "city", "state", "date")
+    rows2 += Array("11", "Claudia", "Sullivan", "42", "Dilwuani", "ND", "09/01/2003")
+    rows2 += Array("12", "Kate", "Adkins", "54", "Fokafrid", "WA", "10/13/2013")
+    rows2 += Array("13", "Eliza", "Lynch", "23", "Bonpige", "ME", "05/02/2015")
+    rows2 += Array("14", "Sarah", "Fleming", "60", "Duvugove", "IA", "04/15/2036")
+    rows2 += Array("15", "Maude", "Bass", "44", "Ukozedka", "CT", "11/08/1988")
+    createCSV(rows2, csvPath2)
+  }
+
+
+  def createCSV(rows: ListBuffer[Array[String]], csvPath: String): Unit = {
+    val out = new BufferedWriter(new FileWriter(csvPath))
+    val writer: CSVWriter = new CSVWriter(out)
+
+    for (row <- rows) {
+      writer.writeNext(row)
+    }
+
+    out.close()
+    writer.close()
+  }
+
+
+  def deleteCSVFiles(): Unit = {
+    try {
+      FileUtils.forceDelete(new File(csvPath1))
+      FileUtils.forceDelete(new File(csvPath2))
+    }
+    catch {
+      case e: Exception =>
+        e.printStackTrace()
+        Assert.fail(e.getMessage)
+    }
+  }
+
+
+  def createTable(): Unit = {
+    sql(
+      s"""
+         | CREATE TABLE $tableName(seq int, first string, last string,
+         |   age int, city string, state string, date date)
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES(
+         |   'sort_scope'='local_sort',
+         |   'sort_columns'='state, age',
+         |   'dateformat'='MM/dd/yyyy')
+      """.stripMargin)
+  }
+
+
+  def loadUnsortedData(n : Int = 1): Unit = {
+    for(_ <- 1 to n) {
+      sql(
+        s"""
+           | LOAD DATA INPATH '$csvPath1' INTO TABLE $tableName
+           | OPTIONS (
+           |   'sort_scope'='no_sort')""".stripMargin)
+    }
+  }
+
+
+  def loadSortedData(n : Int = 1): Unit = {
+    for(_ <- 1 to n) {
+      sql(
+        s"""
+           | LOAD DATA INPATH '$csvPath2' INTO TABLE $tableName
+           | OPTIONS (
+           |   'sort_scope'='local_sort')""".stripMargin)
+    }
+  }
+
+
+  def dropTable(): Unit = {
+    sql(s"DROP TABLE IF EXISTS $tableName")
+  }
+
+
+  test("SORTED LOADS") {
+    loadSortedData(2)
+    sql(s"ALTER TABLE $tableName COMPACT 'major'")
+
+    val out = sql(s"SELECT state, age FROM $tableName").collect()
+    out.map(_.get(0).toString) should
+    equal(Array("CT", "CT", "IA", "IA", "ME", "ME", "ND", "ND", "WA", "WA"))
+  }
+
+
+  test("UNSORTED LOADS") {
+    loadUnsortedData(2)
+    sql(s"ALTER TABLE $tableName COMPACT 'major'")
+
+    val out = sql(s"SELECT state, age FROM $tableName").collect()
+    out.map(_.get(0).toString) should
+    equal(Array("AL", "AL", "CT", "CT", "KS", "KS", "MT", "MT", "WA", "WA"))
+  }
+
+
+  test("MIXED LOADS") {
+    loadSortedData()
+    loadUnsortedData()
+    sql(s"ALTER TABLE $tableName COMPACT 'major'")
+    val out = sql(s"SELECT state, age FROM $tableName").collect()
+    out.map(_.get(0).toString) should
+    equal(Array("AL", "CT", "CT", "IA", "KS", "ME", "MT", "ND", "WA", "WA"))
+    out.map(_.get(1).toString) should
+    equal(Array("61", "37", "44", "60", "54", "23", "39", "42", "20", "54"))
+  }
+
+
+  test("INSERT") {
+    loadSortedData()
+    loadUnsortedData()
+    sql(
+      s"""
+         | INSERT INTO $tableName
+         | VALUES('20', 'Naman', 'Rastogi', '23', 'Bengaluru', 'ZZ', '12/28/2018')
+      """.stripMargin)
+    sql(s"ALTER TABLE $tableName COMPACT 'major'")
+
+    val out = sql(s"SELECT state FROM $tableName").collect()
+    out.map(_.get(0).toString) should equal(
+      Array("AL", "CT", "CT", "IA", "KS", "ME", "MT", "ND", "WA", "WA", "ZZ"))
+  }
+
+
+  test("UPDATE") {
+    loadSortedData()
+    loadUnsortedData()
+    sql(s"UPDATE  $tableName SET (state)=('CT') WHERE seq='13'").collect()
+    sql(s"ALTER TABLE $tableName COMPACT 'major'")
+
+    val out = sql(s"SELECT state FROM $tableName WHERE seq='13'").collect()
+    out.map(_.get(0).toString) should equal(Array("CT"))
+  }
+
+  test("DELETE") {
+    loadSortedData()
+    loadUnsortedData()
+    sql(s"DELETE FROM $tableName WHERE seq='13'").collect()
+    sql(s"ALTER TABLE $tableName COMPACT 'major'")
+
+    val out = sql(s"SELECT state FROM $tableName").collect()
+    out.map(_.get(0).toString) should equal(
+      Array("AL", "CT", "CT", "IA", "KS", "MT", "ND", "WA", "WA"))
+  }
+
+
+  test("RESTRUCTURE TABLE REMOVE COLUMN NOT IN SORT_COLUMNS") {
+    loadSortedData()
+    loadUnsortedData()
+    sql(s"ALTER TABLE $tableName DROP COLUMNS(city)")
+    sql(s"ALTER TABLE $tableName COMPACT 'major'")
+
+    val out = sql(s"SELECT age FROM $tableName").collect()
+    out.map(_.get(0).toString) should equal(
+      Array("61", "37", "44", "60", "54", "23", "39", "42", "20", "54"))
+  }
+
+
+  test("RESTRUCTURE TABLE REMOVE COLUMN IN SORT_COLUMNS") {
+    loadSortedData()
+    loadUnsortedData()
+    sql(s"ALTER TABLE $tableName DROP COLUMNS(state)")
+    sql(s"ALTER TABLE $tableName COMPACT 'major'")
+
+    val out = sql(s"SELECT age FROM $tableName").collect()
+    out.map(_.get(0).toString) should equal(
+      Array("20", "23", "37", "39", "42", "44", "54", "54", "60", "61"))
+  }
+
+
+  test("PREAGG") {
+    loadSortedData()
+    loadUnsortedData()
+    val datamapName = "d1"
+    val tableNameDatamapName = tableName + "_" + datamapName
+
+    sql(
+      s"""
+         | CREATE DATAMAP $datamapName
+         | ON TABLE $tableName
+         | USING 'preaggregate'
+         | AS
+         |   SELECT AVG(age), state
+         |   FROM $tableName
+         |   GROUP BY state
+      """.stripMargin)
+
+    loadSortedData()
+    loadUnsortedData()
+
+    sql(s"ALTER TABLE $tableName COMPACT 'major'")
+    val out = sql(s"SELECT * FROM $tableNameDatamapName").collect()
+    out.map(_.get(2).toString) should equal(
+      Array("AL", "CT", "IA", "KS", "ME", "MT", "ND", "WA"))
+  }
+
+}
index 6b2ee67..96d288f 100644 (file)
@@ -36,7 +36,7 @@ import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.converter.SparkDataTypeConverterImpl
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, SortScopeOptions}
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.block._
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -99,7 +99,7 @@ class CarbonMergerRDD[K, V](
       var mergeNumber = ""
       var exec: CarbonCompactionExecutor = null
       var processor: AbstractResultProcessor = null
-      var rawResultIteratorList: java.util.List[RawResultIterator] = null
+      var rawResultIteratorMap: util.Map[String, util.List[RawResultIterator]] = _
       try {
         // sorting the table block info List.
         val splitList = carbonSparkPartition.split.value.getAllSplits
@@ -159,7 +159,8 @@ class CarbonMergerRDD[K, V](
           CarbonCompactionUtil.createMappingForSegments(tableBlockInfoList)
 
         val dataFileMetadataSegMapping: java.util.Map[String, List[DataFileFooter]] =
-          CarbonCompactionUtil.createDataFileFooterMappingForSegments(tableBlockInfoList)
+          CarbonCompactionUtil.createDataFileFooterMappingForSegments(tableBlockInfoList,
+            carbonTable.getSortScope != SortScopeOptions.SortScope.NO_SORT)
 
         carbonLoadModel.setTablePath(tablePath)
         // check for restructured block
@@ -180,7 +181,7 @@ class CarbonMergerRDD[K, V](
         }
         try {
           // fire a query and get the results.
-          rawResultIteratorList = exec.processTableBlocks(FileFactory.getConfiguration)
+          rawResultIteratorMap = exec.processTableBlocks(FileFactory.getConfiguration)
         } catch {
           case e: Throwable =>
             LOGGER.error(e)
@@ -196,7 +197,21 @@ class CarbonMergerRDD[K, V](
         val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(
           carbonTable, carbonLoadModel.getTaskNo, mergeNumber, true, false)
 
-        if (restructuredBlockExists) {
+        if (carbonTable.getSortScope == SortScopeOptions.SortScope.NO_SORT ||
+          rawResultIteratorMap.get(CarbonCompactionUtil.UNSORTED_IDX).size() == 0) {
+
+          LOGGER.info("RowResultMergerProcessor flow is selected")
+          processor = new RowResultMergerProcessor(
+            databaseName,
+            factTableName,
+            segmentProperties,
+            tempStoreLoc,
+            carbonLoadModel,
+            carbonMergerMapping.campactionType,
+            partitionSpec)
+
+        } else {
+
           LOGGER.info("CompactionResultSortProcessor flow is selected")
           processor = new CompactionResultSortProcessor(
             carbonLoadModel,
@@ -205,19 +220,12 @@ class CarbonMergerRDD[K, V](
             carbonMergerMapping.campactionType,
             factTableName,
             partitionSpec)
-        } else {
-          LOGGER.info("RowResultMergerProcessor flow is selected")
-          processor =
-            new RowResultMergerProcessor(
-              databaseName,
-              factTableName,
-              segmentProperties,
-              tempStoreLoc,
-              carbonLoadModel,
-              carbonMergerMapping.campactionType,
-              partitionSpec)
+
         }
-        mergeStatus = processor.execute(rawResultIteratorList)
+
+        mergeStatus = processor.execute(
+          rawResultIteratorMap.get(CarbonCompactionUtil.UNSORTED_IDX),
+          rawResultIteratorMap.get(CarbonCompactionUtil.SORTED_IDX))
         mergeResult = tableBlockInfoList.get(0).getSegmentId + ',' + mergeNumber
 
       } catch {
@@ -231,7 +239,8 @@ class CarbonMergerRDD[K, V](
         // close all the query executor service and clean up memory acquired during query processing
         if (null != exec) {
           LOGGER.info("Cleaning up query resources acquired during compaction")
-          exec.close(rawResultIteratorList, queryStartTime)
+          exec.close(rawResultIteratorMap.get(CarbonCompactionUtil.UNSORTED_IDX), queryStartTime)
+          exec.close(rawResultIteratorMap.get(CarbonCompactionUtil.SORTED_IDX), queryStartTime)
         }
         // clean up the resources for processor
         if (null != processor) {
index c7c5bdc..aebe549 100644 (file)
@@ -118,7 +118,9 @@ class StreamHandoffRDD[K, V](
     CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, true, false)
     // use CompactionResultSortProcessor to sort data dan write to columnar files
     val processor = prepareHandoffProcessor(carbonTable)
-    val status = processor.execute(iteratorList)
+
+    // The iterator list here is unsorted. The sorted iterators are null.
+    val status = processor.execute(iteratorList, null)
 
     new Iterator[(K, V)] {
       private var finished = false
index b22599d..f557e9b 100644 (file)
@@ -35,10 +35,12 @@ public abstract class AbstractResultProcessor {
   /**
    * This method will perform the desired tasks of merging the selected slices
    *
-   * @param resultIteratorList
+   * @param unsortedResultIteratorList
+   * @param sortedResultIteratorList
    * @return
    */
-  public abstract boolean execute(List<RawResultIterator> resultIteratorList) throws Exception;
+  public abstract boolean execute(List<RawResultIterator> unsortedResultIteratorList,
+      List<RawResultIterator> sortedResultIteratorList) throws Exception;
 
   /**
    * This method will be sued to clean up the resources and close all the spawned threads to avoid
index ea123d5..79b66e2 100644 (file)
@@ -103,12 +103,20 @@ public class CarbonCompactionExecutor {
   /**
    * For processing of the table blocks.
    *
-   * @return List of Carbon iterators
+   * @return Map of String with Carbon iterators
+   * Map has 2 elements: UNSORTED and SORTED
+   * Map(UNSORTED) = List of Iterators which yield sorted data
+   * Map(Sorted) = List of Iterators which yield sorted data
    */
-  public List<RawResultIterator> processTableBlocks(Configuration configuration) throws
-      QueryExecutionException, IOException {
-    List<RawResultIterator> resultList =
-        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+  public Map<String, List<RawResultIterator>> processTableBlocks(Configuration configuration)
+      throws QueryExecutionException, IOException {
+
+    Map<String, List<RawResultIterator>> resultList = new HashMap<>(2);
+    resultList.put(CarbonCompactionUtil.UNSORTED_IDX,
+        new ArrayList<RawResultIterator>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE));
+    resultList.put(CarbonCompactionUtil.SORTED_IDX,
+        new ArrayList<RawResultIterator>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE));
+
     List<TableBlockInfo> list = null;
     QueryModelBuilder builder = new QueryModelBuilder(carbonTable)
         .projectAllColumns()
@@ -126,6 +134,10 @@ public class CarbonCompactionExecutor {
       // for each segment get taskblock info
       TaskBlockInfo taskBlockInfo = taskMap.getValue();
       Set<String> taskBlockListMapping = taskBlockInfo.getTaskSet();
+      // Check if block needs sorting or not
+      boolean sortingRequired =
+          CarbonCompactionUtil.isRestructured(listMetadata, carbonTable.getTableLastUpdatedTime())
+              || !CarbonCompactionUtil.isSorted(listMetadata.get(0));
       for (String task : taskBlockListMapping) {
         list = taskBlockInfo.getTableBlockInfoList(task);
         Collections.sort(list);
@@ -133,10 +145,15 @@ public class CarbonCompactionExecutor {
             "for task -" + task + "- in segment id -" + segmentId + "- block size is -" + list
                 .size());
         queryModel.setTableBlockInfos(list);
-        resultList.add(
-            new RawResultIterator(executeBlockList(list, segmentId, task, configuration),
-                sourceSegProperties,
-                destinationSegProperties, false));
+        if (sortingRequired) {
+          resultList.get(CarbonCompactionUtil.UNSORTED_IDX).add(
+              new RawResultIterator(executeBlockList(list, segmentId, task, configuration),
+                  sourceSegProperties, destinationSegProperties, false));
+        } else {
+          resultList.get(CarbonCompactionUtil.SORTED_IDX).add(
+              new RawResultIterator(executeBlockList(list, segmentId, task, configuration),
+                  sourceSegProperties, destinationSegProperties, false));
+        }
       }
     }
     return resultList;
index c0af1a4..1bf30b5 100644 (file)
@@ -28,6 +28,7 @@ import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -48,6 +49,19 @@ public class CarbonCompactionUtil {
   private static final Logger LOGGER =
       LogServiceFactory.getLogService(CarbonCompactionUtil.class.getName());
 
+
+  /**
+   * The processTableBlocks returns a Map<String, List<RawResultIterator>>.
+   *
+   * This Map has two K,V Pairs. The element with Key=UNSORTED_IDX
+   * is the list of all the iterators which yield unsorted data.
+   *
+   * This Map has two K,V Pairs. The element with Key=SORTED_IDX
+   * is the list of all the iterators which yield sorted data.
+   */
+  public static final String UNSORTED_IDX = "UNSORTED_IDX";
+  public static final String SORTED_IDX = "SORTED_IDX";
+
   /**
    * To create a mapping of Segment Id and TableBlockInfo.
    *
@@ -109,7 +123,7 @@ public class CarbonCompactionUtil {
    * @return
    */
   public static Map<String, List<DataFileFooter>> createDataFileFooterMappingForSegments(
-      List<TableBlockInfo> tableBlockInfoList) throws IOException {
+      List<TableBlockInfo> tableBlockInfoList, boolean isSortedTable) throws IOException {
 
     Map<String, List<DataFileFooter>> segmentBlockInfoMapping = new HashMap<>();
     for (TableBlockInfo blockInfo : tableBlockInfoList) {
@@ -122,9 +136,15 @@ public class CarbonCompactionUtil {
       // in getting the schema last updated time based on which compaction flow is decided that
       // whether it will go to restructure compaction flow or normal compaction flow.
       // This decision will impact the compaction performance so it needs to be decided carefully
-      if (null != blockInfo.getDetailInfo()
-          && blockInfo.getDetailInfo().getSchemaUpdatedTimeStamp() == 0L) {
+      final BlockletInfo blockletInfo = blockInfo.getDetailInfo().getBlockletInfo();
+      if (null != blockInfo.getDetailInfo() && (
+          blockInfo.getDetailInfo().getSchemaUpdatedTimeStamp() == 0L || null == blockletInfo
+              || null == blockletInfo.isSorted() || !blockletInfo.isSorted())) {
         dataFileMatadata = CarbonUtil.readMetadataFile(blockInfo, true);
+        if (null == dataFileMatadata.isSorted()) {
+          dataFileMatadata.setSorted(isSortedTable);
+        }
+        blockInfo.setDataFileFooter(dataFileMatadata);
       } else {
         dataFileMatadata = CarbonUtil.readMetadataFile(blockInfo);
       }
@@ -400,24 +420,56 @@ public class CarbonCompactionUtil {
    * @param tableLastUpdatedTime
    * @return
    */
-  public static boolean checkIfAnyRestructuredBlockExists(Map<String, TaskBlockInfo> segmentMapping,
-      Map<String, List<DataFileFooter>> dataFileMetadataSegMapping, long tableLastUpdatedTime) {
-    boolean restructuredBlockExists = false;
-    for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) {
-      String segmentId = taskMap.getKey();
+  public static boolean checkIfAnyRestructuredBlockExists(
+      Map<String, TaskBlockInfo> segmentMapping,
+      Map<String, List<DataFileFooter>> dataFileMetadataSegMapping,
+      long tableLastUpdatedTime) {
+
+    for (Map.Entry<String, TaskBlockInfo> segmentEntry : segmentMapping.entrySet()) {
+      String segmentId = segmentEntry.getKey();
       List<DataFileFooter> listMetadata = dataFileMetadataSegMapping.get(segmentId);
-      for (DataFileFooter dataFileFooter : listMetadata) {
-        // if schema modified timestamp is greater than footer stored schema timestamp,
-        // it indicates it is a restructured block
-        if (tableLastUpdatedTime > dataFileFooter.getSchemaUpdatedTimeStamp()) {
-          restructuredBlockExists = true;
-          break;
-        }
+
+      if (isRestructured(listMetadata, tableLastUpdatedTime)) {
+        return true;
       }
-      if (restructuredBlockExists) {
-        break;
+    }
+
+    return false;
+  }
+
+  /**
+   * Returns if any element in the list of DataFileFooter
+   * is restructured or not.
+   *
+   * @param listMetadata
+   * @param tableLastUpdatedTime
+   * @return
+   */
+  public static boolean isRestructured(List<DataFileFooter> listMetadata,
+      long tableLastUpdatedTime) {
+    /*
+     * TODO: only in case of add and drop this variable should be true
+     */
+    for (DataFileFooter dataFileFooter : listMetadata) {
+      // if schema modified timestamp is greater than footer stored schema timestamp,
+      // it indicates it is a restructured block
+      if (tableLastUpdatedTime > dataFileFooter.getSchemaUpdatedTimeStamp()) {
+        return true;
       }
     }
-    return restructuredBlockExists;
+    return false;
   }
+
+  /**
+   * Returns if the DataFileFooter containing carbondata file contains
+   * sorted data or not.
+   *
+   * @param footer
+   * @return
+   * @throws IOException
+   */
+  public static boolean isSorted(DataFileFooter footer) throws IOException {
+    return footer.isSorted();
+  }
+
 }
index 34f0572..a1d5b43 100644 (file)
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
@@ -141,6 +142,8 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
 
   private SortParameters sortParameters;
 
+  private CarbonColumn[] noDicAndComplexColumns;
+
   public CompactionResultSortProcessor(CarbonLoadModel carbonLoadModel, CarbonTable carbonTable,
       SegmentProperties segmentProperties, CompactionType compactionType, String tableName,
       PartitionSpec partitionSpec) {
@@ -157,21 +160,25 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
    * This method will iterate over the query result and convert it into a format compatible
    * for data loading
    *
-   * @param resultIteratorList
+   * @param unsortedResultIteratorList
+   * @param sortedResultIteratorList
+   * @return if the compaction is success or not
+   * @throws Exception
    */
-  public boolean execute(List<RawResultIterator> resultIteratorList) throws Exception {
+  public boolean execute(List<RawResultIterator> unsortedResultIteratorList,
+      List<RawResultIterator> sortedResultIteratorList) throws Exception {
     boolean isCompactionSuccess = false;
     try {
       initTempStoreLocation();
       initSortDataRows();
       dataTypes = CarbonDataProcessorUtil.initDataType(carbonTable, tableName, measureCount);
-      processResult(resultIteratorList);
+      processResult(unsortedResultIteratorList);
       // After delete command, if no records are fetched from one split,
       // below steps are not required to be initialized.
       if (isRecordFound) {
         initializeFinalThreadMergerForMergeSort();
         initDataHandler();
-        readAndLoadDataFromSortTempFiles();
+        readAndLoadDataFromSortTempFiles(sortedResultIteratorList);
       }
       isCompactionSuccess = true;
     } catch (Exception e) {
@@ -372,10 +379,15 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
   /**
    * This method will read sort temp files, perform merge sort and add it to store for data loading
    */
-  private void readAndLoadDataFromSortTempFiles() throws Exception {
+  private void readAndLoadDataFromSortTempFiles(List<RawResultIterator> sortedRawResultIterator)
+      throws Exception {
     try {
       intermediateFileMerger.finish();
       finalMerger.startFinalMerge();
+      if (sortedRawResultIterator != null && sortedRawResultIterator.size() > 0) {
+        finalMerger.addInMemoryRawResultIterator(sortedRawResultIterator, segmentProperties,
+            noDicAndComplexColumns, dataTypes);
+      }
       while (finalMerger.hasNext()) {
         Object[] row = finalMerger.next();
         dataHandler.addDataToStore(new CarbonRow(row));
@@ -500,6 +512,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
             tempStoreLocation, carbonStoreLocation);
     carbonFactDataHandlerModel.setSegmentId(carbonLoadModel.getSegmentId());
     setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonFactDataHandlerModel);
+    this.noDicAndComplexColumns = carbonFactDataHandlerModel.getNoDictAndComplexColumns();
     dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(carbonFactDataHandlerModel);
     try {
       dataHandler.initialise();
index 83e630b..5566422 100644 (file)
@@ -18,6 +18,7 @@ package org.apache.carbondata.processing.merger;
 
 import java.io.IOException;
 import java.util.AbstractQueue;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.PriorityQueue;
@@ -103,15 +104,19 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
    * Merge function
    *
    */
-  public boolean execute(List<RawResultIterator> resultIteratorList) throws Exception {
-    initRecordHolderHeap(resultIteratorList);
+  public boolean execute(List<RawResultIterator> unsortedResultIteratorList,
+      List<RawResultIterator> sortedResultIteratorList) throws Exception {
+    List<RawResultIterator> finalIteratorList = new ArrayList<>(unsortedResultIteratorList);
+    finalIteratorList.addAll(sortedResultIteratorList);
+
+    initRecordHolderHeap(finalIteratorList);
     boolean mergeStatus = false;
     int index = 0;
     boolean isDataPresent = false;
     try {
 
       // add all iterators to the queue
-      for (RawResultIterator leaftTupleIterator : resultIteratorList) {
+      for (RawResultIterator leaftTupleIterator : finalIteratorList) {
         this.recordHolderHeap.add(leaftTupleIterator);
         index++;
       }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/InMemorySortTempChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/InMemorySortTempChunkHolder.java
new file mode 100644 (file)
index 0000000..6b2af25
--- /dev/null
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.sort.sortdata;
+
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
+
+/**
+ * Adapter for RowResultIterator. This will be used for sorted
+ * carbondata files during compaction.
+ */
+public class InMemorySortTempChunkHolder extends SortTempFileChunkHolder {
+
+  /**
+   * Iterator over carbondata file
+   */
+  private final RawResultIterator rawResultIterator;
+
+  /**
+   * Used to convert RawResultItertor output row to CarbonRow
+   */
+  private SegmentProperties segmentProperties;
+
+  /**
+   * Used to convert RawResultItertor output row to CarbonRow
+   */
+  private CarbonColumn[] noDicAndComplexColumns;
+
+  /**
+   * Used to store Measure Data Type
+   */
+  private DataType[] measureDataType;
+
+  public InMemorySortTempChunkHolder(RawResultIterator rawResultIterator,
+      SegmentProperties segmentProperties, CarbonColumn[] noDicAndComplexColumns,
+      SortParameters sortParameters, DataType[] measureDataType) {
+    super(sortParameters);
+    this.rawResultIterator = rawResultIterator;
+    this.segmentProperties = segmentProperties;
+    this.noDicAndComplexColumns = noDicAndComplexColumns;
+    this.measureDataType = measureDataType;
+  }
+
+  public void initialise() {
+    // Not required for In memory case as it will not initialize anything
+    throw new UnsupportedOperationException("Operation Not supported");
+  }
+
+  /**
+   * 1. Read row from RawResultIterator'
+   * 2. Convert it to IntermediateSortTempRow
+   * 3. Store it in memory to read through getRow() method
+   */
+  public void readRow() {
+    Object[] row = this.rawResultIterator.next();
+    //TODO add code to get directly Object[] Instead Of CarbonRow Object
+    CarbonRow carbonRow =
+        WriteStepRowUtil.fromMergerRow(row, segmentProperties, noDicAndComplexColumns);
+    Object[] data = carbonRow.getData();
+    Object[] measuresValue = (Object[]) data[WriteStepRowUtil.MEASURE];
+    for (int i = 0; i < measuresValue.length; i++) {
+      measuresValue[i] = getConvertedMeasureValue(measuresValue[i], measureDataType[i]);
+    }
+    returnRow = new IntermediateSortTempRow((int[]) data[WriteStepRowUtil.DICTIONARY_DIMENSION],
+        (Object[]) data[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX], measuresValue);
+
+  }
+
+  public int getEntryCount() {
+    // this will not be used for intermediate sorting
+    throw new UnsupportedOperationException("Operation Not supported");
+  }
+
+  /**
+   * below method will be used to check whether any more records are present
+   * in file or not
+   *
+   * @return more row present in file
+   */
+  public boolean hasNext() {
+    return this.rawResultIterator.hasNext();
+  }
+
+  @Override public boolean equals(Object obj) {
+    return super.equals(obj);
+  }
+
+  @Override public int hashCode() {
+    int hash = rawResultIterator.hashCode();
+    hash += segmentProperties.hashCode();
+    return hash;
+  }
+
+  /**
+   * Below method will be used to close streams
+   */
+  public void closeStream() {
+    rawResultIterator.close();
+  }
+
+  /* below method will be used to get the sort temp row
+   *
+   * @return row
+   */
+  public IntermediateSortTempRow getRow() {
+    return returnRow;
+  }
+
+  /**
+   * This method will convert the spark decimal to java big decimal type
+   *
+   * @param value
+   * @param type
+   * @return
+   */
+  private Object getConvertedMeasureValue(Object value, DataType type) {
+    if (DataTypes.isDecimal(type)) {
+      if (value != null) {
+        value = DataTypeUtil.getDataTypeConverter().convertFromDecimalToBigDecimal(value);
+      }
+      return value;
+    } else {
+      return value;
+    }
+  }
+}
index d2add29..d243749 100644 (file)
@@ -35,7 +35,11 @@ import java.util.concurrent.TimeUnit;
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
 import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
@@ -56,11 +60,6 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
   private static final Object LOCKOBJECT = new Object();
 
   /**
-   * fileCounter
-   */
-  private int fileCounter;
-
-  /**
    * recordHolderHeap
    */
   private AbstractQueue<SortTempFileChunkHolder> recordHolderHeapLocal;
@@ -114,6 +113,33 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
     startSorting(filesToMerge);
   }
 
+  /**
+   * Below method will be used to add in memory raw result iterator to priority queue.
+   * This will be called in case of compaction, when it is compacting sorted and unsorted
+   * both type of carbon data file
+   * This method will add sorted file's RawResultIterator to priority queue using
+   * InMemorySortTempChunkHolder as wrapper
+   *
+   * @param sortedRawResultMergerList
+   * @param segmentProperties
+   * @param noDicAndComplexColumns
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  public void addInMemoryRawResultIterator(List<RawResultIterator> sortedRawResultMergerList,
+      SegmentProperties segmentProperties, CarbonColumn[] noDicAndComplexColumns,
+      DataType[] measureDataType)
+      throws CarbonSortKeyAndGroupByException {
+    for (RawResultIterator rawResultIterator : sortedRawResultMergerList) {
+      InMemorySortTempChunkHolder inMemorySortTempChunkHolder =
+          new InMemorySortTempChunkHolder(rawResultIterator, segmentProperties,
+              noDicAndComplexColumns, sortParameters, measureDataType);
+      if (inMemorySortTempChunkHolder.hasNext()) {
+        inMemorySortTempChunkHolder.readRow();
+      }
+      recordHolderHeapLocal.add(inMemorySortTempChunkHolder);
+    }
+  }
+
   private List<File> getFilesToMergeSort() {
     final int rangeId = sortParameters.getRangeId();
     FileFilter fileFilter = new FileFilter() {
@@ -143,18 +169,17 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
    * @throws CarbonSortKeyAndGroupByException
    */
   private void startSorting(List<File> files) throws CarbonDataWriterException {
-    this.fileCounter = files.size();
-    if (fileCounter == 0) {
+    if (files.size() == 0) {
       LOGGER.info("No files to merge sort");
       return;
     }
 
     LOGGER.info("Started Final Merge");
 
-    LOGGER.info("Number of temp file: " + this.fileCounter);
+    LOGGER.info("Number of temp file: " + files.size());
 
     // create record holder heap
-    createRecordHolderQueue();
+    createRecordHolderQueue(files.size());
 
     // iterate over file list and create chunk holder and add to heap
     LOGGER.info("Started adding first record from each file");
@@ -207,9 +232,9 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
    * This method will be used to create the heap which will be used to hold
    * the chunk of data
    */
-  private void createRecordHolderQueue() {
+  private void createRecordHolderQueue(int size) {
     // creating record holder heap
-    this.recordHolderHeapLocal = new PriorityQueue<SortTempFileChunkHolder>(fileCounter);
+    this.recordHolderHeapLocal = new PriorityQueue<SortTempFileChunkHolder>(size);
   }
 
   private synchronized void notifyFailure(Throwable throwable) {
@@ -256,9 +281,6 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
       // if chunk is empty then close the stream
       poll.closeStream();
 
-      // change the file counter
-      --this.fileCounter;
-
       // reaturn row
       return row;
     }
@@ -285,7 +307,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
    * @return more element is present
    */
   public boolean hasNext() {
-    return this.fileCounter > 0;
+    return this.recordHolderHeapLocal.size() > 0;
   }
 
   public void close() {
index ef9c3fa..82e6b37 100644 (file)
@@ -70,7 +70,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
   /**
    * return row
    */
-  private IntermediateSortTempRow returnRow;
+  protected IntermediateSortTempRow returnRow;
   private int readBufferSize;
   private String compressorName;
 
@@ -96,10 +96,18 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
    * totalRecordFetch
    */
   private int totalRecordFetch;
-  private TableFieldStat tableFieldStat;
+  protected TableFieldStat tableFieldStat;
   private SortStepRowHandler sortStepRowHandler;
-  private Comparator<IntermediateSortTempRow> comparator;
+  protected Comparator<IntermediateSortTempRow> comparator;
   private boolean convertToActualField;
+
+  public SortTempFileChunkHolder(SortParameters sortParameters) {
+    this.tableFieldStat = new TableFieldStat(sortParameters);
+    this.comparator =
+        new IntermediateSortTempRowComparator(tableFieldStat.getIsSortColNoDictFlags(),
+            tableFieldStat.getNoDictDataType());
+  }
+
   /**
    * Constructor to initialize
    *
@@ -109,14 +117,12 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
    */
   public SortTempFileChunkHolder(File tempFile, SortParameters sortParameters, String tableName,
       boolean convertToActualField) {
+    this(sortParameters);
     // set temp file
     this.tempFile = tempFile;
     this.readBufferSize = sortParameters.getBufferSize();
     this.compressorName = sortParameters.getSortTempCompressorName();
-    this.tableFieldStat = new TableFieldStat(sortParameters);
     this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
-    this.comparator = new IntermediateSortTempRowComparator(
-        tableFieldStat.getIsSortColNoDictFlags(), tableFieldStat.getNoDictDataType());
     this.executorService = Executors
         .newFixedThreadPool(1, new CarbonThreadFactory("SafeSortTempChunkHolderPool:" + tableName));
     this.convertToActualField = convertToActualField;
index c0fab18..9d8202e 100644 (file)
@@ -411,6 +411,7 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel
         .setColumnLocalDictGenMap(CarbonUtil.getLocalDictionaryModel(carbonTable));
     carbonFactDataHandlerModel.setVarcharDimIdxInNoDict(varcharDimIdxInNoDict);
+    carbonFactDataHandlerModel.sortScope = carbonTable.getSortScope();
     return carbonFactDataHandlerModel;
   }
 
index 8ed0bf7..472f143 100644 (file)
@@ -54,6 +54,8 @@ import org.apache.carbondata.format.IndexHeader;
 import org.apache.carbondata.processing.datamap.DataMapWriterListener;
 import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
 
+import static org.apache.carbondata.core.constants.SortScopeOptions.SortScope.NO_SORT;
+
 import org.apache.log4j.Logger;
 
 public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
@@ -405,6 +407,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
     IndexHeader indexHeader = CarbonMetadataUtil
         .getIndexHeader(localCardinality, thriftColumnSchemaList, model.getBucketId(),
             model.getSchemaUpdatedTimeStamp());
+    indexHeader.setIs_sort(model.getSortScope() != null && model.getSortScope() != NO_SORT);
     // get the block index info thrift
     List<BlockIndex> blockIndexThrift = CarbonMetadataUtil.getBlockIndexInfo(blockIndexInfoList);
     String indexFileName;
index ccbc544..dc2268c 100644 (file)
@@ -91,7 +91,10 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
       LOGGER.info("Blocklet size configure for table is: " + blockletSizeThreshold);
     }
     blockletDataHolder = new BlockletDataHolder(fallbackExecutorService, model);
-    isSorted = model.getSortScope() != NO_SORT;
+    if (model.getSortScope() != null) {
+      isSorted = model.getSortScope() != NO_SORT;
+    }
+    LOGGER.info("Sort Scope : " + model.getSortScope());
   }
 
   @Override