[CARBONDATA-3237] Fix presto carbon issues in dictionary include scenario
authorajantha-bhat <ajanthabhat@gmail.com>
Mon, 7 Jan 2019 09:20:11 +0000 (14:50 +0530)
committerkumarvishal09 <kumarvishal1802@gmail.com>
Wed, 9 Jan 2019 12:51:10 +0000 (18:21 +0530)
problem1: Decimal column with dictionary include cannot be read in
presto
cause: int is typecasted to decimal for dictionary columns in decimal stream reader.
solution: keep original data type as well as new data type for decimal
stream reader.

problem2: Optimize presto query time for dictionary include string column
currently, for each query, presto carbon creates dictionary block for string columns.
cause: This happens for each query and if cardinality is more , it takes more time to build.
solution: dictionary block is not required. we can lookup using normal dictionary lookup.

This closes #3055

integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala

index fb8300a..140e46b 100644 (file)
@@ -37,8 +37,6 @@ import org.apache.carbondata.presto.readers.ShortStreamReader;
 import org.apache.carbondata.presto.readers.SliceStreamReader;
 import org.apache.carbondata.presto.readers.TimestampStreamReader;
 
-import com.facebook.presto.spi.block.Block;
-
 public class CarbonVectorBatch {
 
   private static final int DEFAULT_BATCH_SIZE = 4 * 1024;
@@ -63,8 +61,7 @@ public class CarbonVectorBatch {
     DataType[] dataTypes = readSupport.getDataTypes();
 
     for (int i = 0; i < schema.length; ++i) {
-      columns[i] = createDirectStreamReader(maxRows, dataTypes[i], schema[i], dictionaries[i],
-          readSupport.getDictionaryBlock(i));
+      columns[i] = createDirectStreamReader(maxRows, dataTypes[i], schema[i], dictionaries[i]);
     }
   }
 
@@ -79,7 +76,7 @@ public class CarbonVectorBatch {
   }
 
   private CarbonColumnVectorImpl createDirectStreamReader(int batchSize, DataType dataType,
-      StructField field, Dictionary dictionary, Block dictionaryBlock) {
+      StructField field, Dictionary dictionary) {
     if (dataType == DataTypes.BOOLEAN) {
       return new BooleanStreamReader(batchSize, field.getDataType(), dictionary);
     } else if (dataType == DataTypes.SHORT) {
@@ -93,9 +90,10 @@ public class CarbonVectorBatch {
     } else if (dataType == DataTypes.DOUBLE) {
       return new DoubleStreamReader(batchSize, field.getDataType(), dictionary);
     } else if (dataType == DataTypes.STRING) {
-      return new SliceStreamReader(batchSize, field.getDataType(), dictionaryBlock);
+      return new SliceStreamReader(batchSize, field.getDataType(), dictionary);
     } else if (DataTypes.isDecimal(dataType)) {
-      return new DecimalSliceStreamReader(batchSize, (DecimalType) field.getDataType(), dictionary);
+      return new DecimalSliceStreamReader(batchSize, field.getDataType(), (DecimalType) dataType,
+          dictionary);
     } else {
       return new ObjectStreamReader(batchSize, field.getDataType());
     }
index 2976ca7..ddc855a 100644 (file)
@@ -24,6 +24,7 @@ import java.util.Objects;
 import static java.math.RoundingMode.HALF_UP;
 
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
 import org.apache.carbondata.core.util.DataTypeUtil;
@@ -57,10 +58,12 @@ public class DecimalSliceStreamReader extends CarbonColumnVectorImpl
   protected BlockBuilder builder;
   private Dictionary dictionary;
 
-  public DecimalSliceStreamReader(int batchSize,
-      org.apache.carbondata.core.metadata.datatype.DecimalType dataType, Dictionary dictionary) {
+  public DecimalSliceStreamReader(int batchSize, DataType dataType,
+      org.apache.carbondata.core.metadata.datatype.DecimalType decimalDataType,
+      Dictionary dictionary) {
     super(batchSize, dataType);
-    this.type = DecimalType.createDecimalType(dataType.getPrecision(), dataType.getScale());
+    this.type =
+        DecimalType.createDecimalType(decimalDataType.getPrecision(), decimalDataType.getScale());
     this.batchSize = batchSize;
     this.builder = type.createBlockBuilder(null, batchSize);
     this.dictionary = dictionary;
index 0d4b4f0..1e4688f 100644 (file)
 
 package org.apache.carbondata.presto.readers;
 
+import java.nio.charset.Charset;
+import java.util.Objects;
 import java.util.Optional;
 
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
 import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 import com.facebook.presto.spi.block.Block;
 import com.facebook.presto.spi.block.BlockBuilder;
@@ -44,27 +50,31 @@ public class SliceStreamReader extends CarbonColumnVectorImpl implements PrestoV
 
   protected BlockBuilder builder;
 
-  int[] values;
-
   private Block dictionaryBlock;
 
+  private boolean isLocalDict;
+
+  private Dictionary globalDictionary;
+
   public SliceStreamReader(int batchSize, DataType dataType,
-      Block dictionaryBlock) {
+      Dictionary dictionary) {
     super(batchSize, dataType);
+    this.globalDictionary = dictionary;
     this.batchSize = batchSize;
-    if (dictionaryBlock == null) {
-      this.builder = type.createBlockBuilder(null, batchSize);
-    } else {
-      this.dictionaryBlock = dictionaryBlock;
-      this.values = new int[batchSize];
-    }
+    this.builder = type.createBlockBuilder(null, batchSize);
   }
 
   @Override public Block buildBlock() {
     if (dictionaryBlock == null) {
       return builder.build();
     } else {
-      return new DictionaryBlock(batchSize, dictionaryBlock, values);
+      int[] dataArray;
+      if (isLocalDict) {
+        dataArray = (int[]) ((CarbonColumnVectorImpl) getDictionaryVector()).getDataArray();
+      } else {
+        dataArray = (int[]) getDataArray();
+      }
+      return new DictionaryBlock(batchSize, dictionaryBlock, dataArray);
     }
   }
 
@@ -95,22 +105,13 @@ public class SliceStreamReader extends CarbonColumnVectorImpl implements PrestoV
     dictOffsets[dictOffsets.length - 1] = size;
     dictionaryBlock = new VariableWidthBlock(dictionary.getDictionarySize(),
         Slices.wrappedBuffer(singleArrayDictValues), dictOffsets, Optional.of(nulls));
-    values = (int[]) ((CarbonColumnVectorImpl) getDictionaryVector()).getDataArray();
+    this.isLocalDict = true;
   }
-
   @Override public void setBatchSize(int batchSize) {
     this.batchSize = batchSize;
   }
 
-  @Override public void putInt(int rowId, int value) {
-    values[rowId] = value;
-  }
 
-  @Override public void putInts(int rowId, int count, int value) {
-    for (int i = 0; i < count; i++) {
-      values[rowId++] = value;
-    }
-  }
 
   @Override public void putByteArray(int rowId, byte[] value) {
     type.writeSlice(builder, wrappedBuffer(value));
@@ -142,5 +143,17 @@ public class SliceStreamReader extends CarbonColumnVectorImpl implements PrestoV
 
   @Override public void reset() {
     builder = type.createBlockBuilder(null, batchSize);
+    this.isLocalDict = false;
+  }
+
+  @Override public void putInt(int rowId, int value) {
+    Object data = DataTypeUtil
+        .getDataBasedOnDataType(globalDictionary.getDictionaryValueForKey(value), DataTypes.STRING);
+    if (Objects.isNull(data)) {
+      builder.appendNull();
+    } else {
+      type.writeSlice(builder, wrappedBuffer(
+          ((String) data).getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))));
+    }
   }
 }
index 97deb6f..4bbd931 100644 (file)
@@ -37,7 +37,6 @@ import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
 class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] {
   private var dictionaries: Array[Dictionary] = _
   private var dataTypes: Array[DataType] = _
-  private var dictionaryBlock: Array[Block] = _
 
   /**
    * This initialization is done inside executor task
@@ -50,7 +49,6 @@ class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] {
 
     dictionaries = new Array[Dictionary](carbonColumns.length)
     dataTypes = new Array[DataType](carbonColumns.length)
-    dictionaryBlock = new Array[Block](carbonColumns.length)
 
     carbonColumns.zipWithIndex.foreach {
       case (carbonColumn, index) => if (carbonColumn.hasEncoding(Encoding.DICTIONARY) &&
@@ -66,13 +64,7 @@ class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] {
         dictionaries(index) = forwardDictionaryCache
           .get(new DictionaryColumnUniqueIdentifier(carbonTable.getAbsoluteTableIdentifier,
             carbonColumn.getColumnIdentifier, dataTypes(index), dictionaryPath))
-        // in case of string data type create dictionarySliceArray same as that of presto code
-        if (dataTypes(index).equals(DataTypes.STRING)) {
-          dictionaryBlock(index) = createDictionaryBlock(dictionaries(index))
-        }
-      }
-
-      else {
+      } else {
         dataTypes(index) = carbonColumn.getDataType
       }
     }
@@ -87,7 +79,7 @@ class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] {
    */
   private def createDictionaryBlock(dictionaryData: Dictionary): Block = {
     val chunks: DictionaryChunksWrapper = dictionaryData.getDictionaryChunks
-    val positionCount = chunks.getSize;
+    val positionCount = chunks.getSize
 
    // In dictionary there will be only one null and the key value will be 1 by default in carbon,
    // hence the isNullVector will be populated only once with null value it has no bearing on
@@ -127,16 +119,6 @@ class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] {
     throw new RuntimeException("UnSupported Method")
   }
 
-  /**
-   * Function to get the SliceArrayBlock with dictionary Data
-   *
-   * @param columnNo
-   * @return
-   */
-  def getDictionaryBlock(columnNo: Int): Block = {
-    dictionaryBlock(columnNo)
-  }
-
   def getDictionaries: Array[Dictionary] = {
     dictionaries
   }
index 115e868..9172b31 100644 (file)
@@ -239,6 +239,7 @@ object CarbonDataStoreCreator {
     bonus.setDataType(DataTypes.createDecimalType(10, 4))
     bonus.setPrecision(10)
     bonus.setScale(4)
+    bonus.setEncodingList(dictionaryEncoding)
     bonus.setEncodingList(invertedIndexEncoding)
     bonus.setColumnUniqueId(UUID.randomUUID().toString)
     bonus.setDimensionColumn(false)