[CARBONDATA-3217] Optimize implicit filter expression performance by removing extra...
authormanishgupta88 <tomanishgupta18@gmail.com>
Thu, 27 Dec 2018 09:48:07 +0000 (15:18 +0530)
committerkumarvishal09 <kumarvishal1802@gmail.com>
Fri, 4 Jan 2019 11:07:54 +0000 (16:37 +0530)
Fixed performance issue for Implicit filter column
1. Removed serialization all the implicit filter values in each task. Instead serialized values only for the blocks going to particular task
2. Removed 2 times deserialization of implicit filter values in executor for each task. 1 time is sufficient

This closes #3039

12 files changed:
core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ImplicitExpression.java [new file with mode: 0644]
core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java
core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ImplicitIncludeFilterExecutorImpl.java
core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/visitor/ImplicitColumnVisitor.java
hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/TestImplicitFilterExpression.scala [new file with mode: 0644]
integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala

index 6b04cf7..e29dfef 100644 (file)
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.AbstractMemoryDMStore;
 import org.apache.carbondata.core.indexstore.BlockMetaInfo;
 import org.apache.carbondata.core.indexstore.Blocklet;
@@ -485,7 +486,7 @@ public class BlockDataMap extends CoarseGrainDataMap
     String fileName = filePath + CarbonCommonConstants.FILE_SEPARATOR + new String(
         dataMapRow.getByteArray(FILE_PATH_INDEX), CarbonCommonConstants.DEFAULT_CHARSET_CLASS)
         + CarbonTablePath.getCarbonDataExtension();
-    return fileName;
+    return FileFactory.getUpdatedFilePath(fileName);
   }
 
   private void addTaskSummaryRowToUnsafeMemoryStore(CarbonRowSchema[] taskSummarySchema,
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ImplicitExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ImplicitExpression.java
new file mode 100644 (file)
index 0000000..eab564e
--- /dev/null
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.scan.expression.conditional;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.ExpressionResult;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.exception.FilterIllegalMemberException;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Custom class to handle filter values for Implicit filter
+ */
+public class ImplicitExpression extends Expression {
+
+  /**
+   * map that contains the mapping of block id to the valid blocklets in that block which contain
+   * the data as per the applied filter
+   */
+  private Map<String, Set<Integer>> blockIdToBlockletIdMapping;
+
+  public ImplicitExpression(List<Expression> implicitFilterList) {
+    // initialize map with half the size of filter list as one block id can contain
+    // multiple blocklets
+    blockIdToBlockletIdMapping = new HashMap<>(implicitFilterList.size() / 2);
+    for (Expression value : implicitFilterList) {
+      String blockletPath = ((LiteralExpression) value).getLiteralExpValue().toString();
+      addBlockEntry(blockletPath);
+    }
+  }
+
+  public ImplicitExpression(Map<String, Set<Integer>> blockIdToBlockletIdMapping) {
+    this.blockIdToBlockletIdMapping = blockIdToBlockletIdMapping;
+  }
+
+  private void addBlockEntry(String blockletPath) {
+    String blockId =
+        blockletPath.substring(0, blockletPath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR));
+    Set<Integer> blockletIds = blockIdToBlockletIdMapping.get(blockId);
+    if (null == blockletIds) {
+      blockletIds = new HashSet<>();
+      blockIdToBlockletIdMapping.put(blockId, blockletIds);
+    }
+    blockletIds.add(Integer.parseInt(blockletPath.substring(blockId.length() + 1)));
+  }
+
+  @Override public ExpressionResult evaluate(RowIntf value)
+      throws FilterUnsupportedException, FilterIllegalMemberException {
+    throw new UnsupportedOperationException("Operation not supported for Implicit expression");
+  }
+
+  public Map<String, Set<Integer>> getBlockIdToBlockletIdMapping() {
+    return blockIdToBlockletIdMapping;
+  }
+
+  @Override public ExpressionType getFilterExpressionType() {
+    return ExpressionType.IMPLICIT;
+  }
+
+  @Override public void findAndSetChild(Expression oldExpr, Expression newExpr) {
+  }
+
+  @Override public String getString() {
+    StringBuilder value = new StringBuilder();
+    value.append("ImplicitExpression(");
+    for (Map.Entry<String, Set<Integer>> entry : blockIdToBlockletIdMapping.entrySet()) {
+      value.append(entry.getKey()).append(" --> ");
+      value.append(
+          StringUtils.join(entry.getValue().toArray(new Integer[entry.getValue().size()]), ","))
+          .append(";");
+      // return maximum of 100 characters in the getString method
+      if (value.length() > 100) {
+        value.append("...");
+        break;
+      }
+    }
+    value.append(')');
+    return value.toString();
+  }
+
+  @Override public String getStatement() {
+    return getString();
+  }
+}
\ No newline at end of file
index 8677a2d..bf7694a 100644 (file)
 package org.apache.carbondata.core.scan.filter;
 
 import java.io.Serializable;
-import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-
 public class ColumnFilterInfo implements Serializable {
 
   private static final long serialVersionUID = 8181578747306832771L;
@@ -34,9 +32,9 @@ public class ColumnFilterInfo implements Serializable {
 
   /**
    * Implicit column filter values to be used for block and blocklet pruning
+   * Contains block id to its blocklet mapping
    */
-  private Set<String> implicitColumnFilterList;
-  private transient Set<String> implicitDriverColumnFilterList;
+  private Map<String, Set<Integer>> implicitColumnFilterBlockToBlockletsMap;
   private List<Integer> excludeFilterList;
   /**
    * maintain the no dictionary filter values list.
@@ -85,15 +83,16 @@ public class ColumnFilterInfo implements Serializable {
   public void setExcludeFilterList(List<Integer> excludeFilterList) {
     this.excludeFilterList = excludeFilterList;
   }
-  public Set<String> getImplicitColumnFilterList() {
-    return implicitColumnFilterList;
+  public Map<String, Set<Integer>> getImplicitColumnFilterBlockToBlockletsMap() {
+    return implicitColumnFilterBlockToBlockletsMap;
   }
 
-  public void setImplicitColumnFilterList(List<String> implicitColumnFilterList) {
+  public void setImplicitColumnFilterBlockToBlockletsMap(
+      Map<String, Set<Integer>> implicitColumnFilterBlockToBlockletsMap) {
     // this is done to improve the query performance. As the list of size increases time taken to
     // search in list will increase as list contains method uses equals check internally but set
     // will be very fast as it will directly use the has code to find the bucket and search
-    this.implicitColumnFilterList = new HashSet<>(implicitColumnFilterList);
+    this.implicitColumnFilterBlockToBlockletsMap = implicitColumnFilterBlockToBlockletsMap;
   }
 
   public List<Object> getMeasuresFilterValuesList() {
@@ -103,30 +102,4 @@ public class ColumnFilterInfo implements Serializable {
   public void setMeasuresFilterValuesList(List<Object> measuresFilterValuesList) {
     this.measuresFilterValuesList = measuresFilterValuesList;
   }
-
-  public Set<String> getImplicitDriverColumnFilterList() {
-    // this list is required to be populated only n case of driver, so in executor this check will
-    // avoid unnecessary loading of the driver filter list
-    if (implicitDriverColumnFilterList != null) {
-      return implicitDriverColumnFilterList;
-    }
-    synchronized (this) {
-      if (null == implicitDriverColumnFilterList) {
-        // populate only once. (can be called in multi-thread)
-        implicitDriverColumnFilterList = populateBlockIdListForDriverBlockPruning();
-      }
-    }
-    return implicitDriverColumnFilterList;
-  }
-
-  private Set<String> populateBlockIdListForDriverBlockPruning() {
-    Set<String> columnFilterList = new HashSet<>(implicitColumnFilterList.size());
-    String blockId;
-    for (String blockletId : implicitColumnFilterList) {
-      blockId =
-          blockletId.substring(0, blockletId.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR));
-      columnFilterList.add(blockId);
-    }
-    return columnFilterList;
-  }
 }
index f382f0b..15b8cba 100644 (file)
@@ -71,6 +71,7 @@ import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.ExpressionResult;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
 import org.apache.carbondata.core.scan.expression.conditional.ConditionalExpression;
+import org.apache.carbondata.core.scan.expression.conditional.ImplicitExpression;
 import org.apache.carbondata.core.scan.expression.conditional.InExpression;
 import org.apache.carbondata.core.scan.expression.conditional.ListExpression;
 import org.apache.carbondata.core.scan.expression.exception.FilterIllegalMemberException;
@@ -1996,16 +1997,16 @@ public final class FilterUtil {
    * This method will get the no dictionary data based on filters and same
    * will be in DimColumnFilterInfo
    *
-   * @param evaluateResultListFinal
+   * @param implicitColumnFilterList
    * @param isIncludeFilter
    * @return
    */
-  public static ColumnFilterInfo getImplicitColumnFilterList(List<String> evaluateResultListFinal,
-      boolean isIncludeFilter) {
+  public static ColumnFilterInfo getImplicitColumnFilterList(
+      Map<String, Set<Integer>> implicitColumnFilterList, boolean isIncludeFilter) {
     ColumnFilterInfo columnFilterInfo = new ColumnFilterInfo();
     columnFilterInfo.setIncludeFilter(isIncludeFilter);
-    if (null != evaluateResultListFinal) {
-      columnFilterInfo.setImplicitColumnFilterList(evaluateResultListFinal);
+    if (null != implicitColumnFilterList) {
+      columnFilterInfo.setImplicitColumnFilterBlockToBlockletsMap(implicitColumnFilterList);
     }
     return columnFilterInfo;
   }
@@ -2019,6 +2020,44 @@ public final class FilterUtil {
    * @param expression
    */
   public static void removeInExpressionNodeWithPositionIdColumn(Expression expression) {
+    if (null != getImplicitFilterExpression(expression)) {
+      setTrueExpressionAsRightChild(expression);
+    }
+  }
+
+  /**
+   * This method will check for ColumnExpression with column name positionID and if found will
+   * replace the InExpression with true expression. This is done to stop serialization of List
+   * expression which is right children of InExpression as it can impact the query performance
+   * as the size of list grows bigger.
+   *
+   * @param expression
+   */
+  public static void setTrueExpressionAsRightChild(Expression expression) {
+    setNewExpressionForRightChild(expression, new TrueExpression(null));
+  }
+
+  /**
+   * Method to remove right child of the AND expression and set new expression for right child
+   *
+   * @param expression
+   * @param rightChild
+   */
+  public static void setNewExpressionForRightChild(Expression expression, Expression rightChild) {
+    // Remove the right expression node and point the expression to left node expression
+    expression.findAndSetChild(((AndExpression) expression).getRight(), rightChild);
+    LOGGER.info("In expression removed from the filter expression list to prevent it from"
+        + " serializing on executor");
+  }
+
+  /**
+   * This methdd will check if ImplictFilter is present or not
+   * if it is present then return that ImplicitFilterExpression
+   *
+   * @param expression
+   * @return
+   */
+  public static Expression getImplicitFilterExpression(Expression expression) {
     ExpressionType filterExpressionType = expression.getFilterExpressionType();
     if (ExpressionType.AND == filterExpressionType) {
       Expression rightExpression = ((AndExpression) expression).getRight();
@@ -2030,14 +2069,30 @@ public final class FilterUtil {
           if (childExpression instanceof ColumnExpression && ((ColumnExpression) childExpression)
               .getColumnName().equalsIgnoreCase(CarbonCommonConstants.POSITION_ID)) {
             // Remove the right expression node and point the expression to left node expression
-            expression
-                .findAndSetChild(((AndExpression) expression).getRight(), new TrueExpression(null));
-            LOGGER.info("In expression removed from the filter expression list to prevent it from"
-                + " serializing on executor");
+            // if 1st children is implict column positionID then 2nd children will be
+            // implicit filter list
+            return children.get(1);
           }
         }
       }
     }
+    return null;
+  }
+
+  /**
+   * This method will create implicit expression and set as right child in the current expression
+   *
+   * @param expression
+   * @param blockIdToBlockletIdMapping
+   */
+  public static void createImplicitExpressionAndSetAsRightChild(Expression expression,
+      Map<String, Set<Integer>> blockIdToBlockletIdMapping) {
+    ColumnExpression columnExpression =
+        new ColumnExpression(CarbonCommonConstants.POSITION_ID, DataTypes.STRING);
+    ImplicitExpression implicitExpression = new ImplicitExpression(blockIdToBlockletIdMapping);
+    InExpression inExpression = new InExpression(columnExpression, implicitExpression);
+    setNewExpressionForRightChild(expression, inExpression);
+    LOGGER.info("Implicit expression added to the filter expression");
   }
 
   /**
index fd93e59..c932504 100644 (file)
@@ -19,7 +19,9 @@ package org.apache.carbondata.core.scan.filter.executer;
 
 import java.io.IOException;
 import java.util.BitSet;
+import java.util.Set;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
@@ -85,13 +87,24 @@ public class ImplicitIncludeFilterExecutorImpl
     boolean isScanRequired = false;
     String shortBlockId = CarbonTablePath.getShortBlockId(uniqueBlockPath);
     if (uniqueBlockPath.endsWith(".carbondata")) {
-      if (dimColumnEvaluatorInfo.getFilterValues().getImplicitDriverColumnFilterList()
-          .contains(shortBlockId)) {
+      if (dimColumnEvaluatorInfo.getFilterValues().getImplicitColumnFilterBlockToBlockletsMap()
+          .containsKey(shortBlockId)) {
         isScanRequired = true;
       }
-    } else if (dimColumnEvaluatorInfo.getFilterValues().getImplicitColumnFilterList()
-        .contains(shortBlockId)) {
-      isScanRequired = true;
+    } else {
+      // in case of CACHE_LEVEL = BLOCKLET, shortBlockId contains both block id and blocklet id
+      // so separating out block id for look up in implicit filter
+      String blockId =
+          shortBlockId.substring(0, shortBlockId.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR));
+      Set<Integer> blockletIds =
+          dimColumnEvaluatorInfo.getFilterValues().getImplicitColumnFilterBlockToBlockletsMap()
+              .get(blockId);
+      if (null != blockletIds) {
+        int idInUniqueBlockPath = Integer.parseInt(shortBlockId.substring(blockId.length() + 1));
+        if (blockletIds.contains(idInUniqueBlockPath)) {
+          isScanRequired = true;
+        }
+      }
     }
     if (isScanRequired) {
       bitSet.set(0);
index 32a3b12..1653a4a 100644 (file)
  */
 package org.apache.carbondata.core.scan.filter.resolver.resolverinfo.visitor;
 
-import java.io.IOException;
-import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
-import org.apache.carbondata.core.scan.expression.exception.FilterIllegalMemberException;
+import org.apache.carbondata.core.scan.expression.conditional.ImplicitExpression;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.ColumnFilterInfo;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
@@ -43,18 +43,18 @@ public class ImplicitColumnVisitor implements ResolvedFilterInfoVisitorIntf {
    */
 
   @Override public void populateFilterResolvedInfo(ColumnResolvedFilterInfo visitableObj,
-      FilterResolverMetadata metadata) throws FilterUnsupportedException, IOException {
+      FilterResolverMetadata metadata) throws FilterUnsupportedException {
     if (visitableObj instanceof DimColumnResolvedFilterInfo) {
       ColumnFilterInfo resolvedFilterObject = null;
-      List<String> evaluateResultListFinal;
-      try {
-        evaluateResultListFinal = metadata.getExpression().evaluate(null).getListAsString();
-      } catch (FilterIllegalMemberException e) {
-        throw new FilterUnsupportedException(e);
+      if (metadata.getExpression() instanceof ImplicitExpression) {
+        Map<String, Set<Integer>> blockIdToBlockletIdMapping =
+            ((ImplicitExpression) metadata.getExpression()).getBlockIdToBlockletIdMapping();
+        resolvedFilterObject = FilterUtil
+            .getImplicitColumnFilterList(blockIdToBlockletIdMapping, metadata.isIncludeFilter());
+        ((DimColumnResolvedFilterInfo) visitableObj).setFilterValues(resolvedFilterObject);
+      } else {
+        throw new FilterUnsupportedException("Expression not an instance of implicit expression");
       }
-      resolvedFilterObject = FilterUtil
-          .getImplicitColumnFilterList(evaluateResultListFinal, metadata.isIncludeFilter());
-      ((DimColumnResolvedFilterInfo)visitableObj).setFilterValues(resolvedFilterObject);
     }
   }
 }
index 405ff53..de2451b 100644 (file)
@@ -23,8 +23,10 @@ import java.io.Serializable;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.Segment;
@@ -89,6 +91,11 @@ public class CarbonInputSplit extends FileSplit
   private FileFormat fileFormat = FileFormat.COLUMNAR_V3;
 
   private String dataMapWritePath;
+  /**
+   * validBlockletIds will contain the valid blocklted ids for a given block that contains the data
+   * after pruning from driver. These will be used in executor for further pruning of blocklets
+   */
+  private Set<Integer> validBlockletIds;
 
   public CarbonInputSplit() {
     segment = null;
@@ -252,6 +259,11 @@ public class CarbonInputSplit extends FileSplit
     if (dataMapWriterPathExists) {
       dataMapWritePath = in.readUTF();
     }
+    int validBlockletIdCount = in.readShort();
+    validBlockletIds = new HashSet<>(validBlockletIdCount);
+    for (int i = 0; i < validBlockletIdCount; i++) {
+      validBlockletIds.add((int) in.readShort());
+    }
   }
 
   @Override public void write(DataOutput out) throws IOException {
@@ -278,6 +290,10 @@ public class CarbonInputSplit extends FileSplit
     if (dataMapWritePath != null) {
       out.writeUTF(dataMapWritePath);
     }
+    out.writeShort(getValidBlockletIds().size());
+    for (Integer blockletId : getValidBlockletIds()) {
+      out.writeShort(blockletId);
+    }
   }
 
   public List<String> getInvalidSegments() {
@@ -444,4 +460,16 @@ public class CarbonInputSplit extends FileSplit
   public Blocklet makeBlocklet() {
     return new Blocklet(getPath().getName(), blockletId);
   }
+
+  public Set<Integer> getValidBlockletIds() {
+    if (null == validBlockletIds) {
+      validBlockletIds = new HashSet<>();
+    }
+    return validBlockletIds;
+  }
+
+  public void setValidBlockletIds(Set<Integer> validBlockletIds) {
+    this.validBlockletIds = validBlockletIds;
+  }
+
 }
index 9b43877..24691f2 100644 (file)
@@ -48,6 +48,7 @@ import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.profiler.ExplainCollector;
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.model.QueryModelBuilder;
@@ -611,7 +612,8 @@ m filterExpression
   @Override public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
       TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
     Configuration configuration = taskAttemptContext.getConfiguration();
-    QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext);
+    QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext,
+        getFilterPredicates(taskAttemptContext.getConfiguration()));
     CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
     return new CarbonRecordReader<T>(queryModel, readSupport,
         taskAttemptContext.getConfiguration());
@@ -619,6 +621,12 @@ m filterExpression
 
   public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
       throws IOException {
+    return createQueryModel(inputSplit, taskAttemptContext,
+        getFilterPredicates(taskAttemptContext.getConfiguration()));
+  }
+
+  public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext,
+      Expression filterExpression) throws IOException {
     Configuration configuration = taskAttemptContext.getConfiguration();
     CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
 
@@ -630,9 +638,10 @@ m filterExpression
     } else {
       projectColumns = new String[]{};
     }
+    checkAndAddImplicitExpression(filterExpression, inputSplit);
     QueryModel queryModel = new QueryModelBuilder(carbonTable)
         .projectColumns(projectColumns)
-        .filterExpression(getFilterPredicates(configuration))
+        .filterExpression(filterExpression)
         .dataConverter(getDataTypeConverter(configuration))
         .build();
 
@@ -652,6 +661,36 @@ m filterExpression
     return queryModel;
   }
 
+  /**
+   * This method will create an Implict Expression and set it as right child in the given
+   * expression
+   *
+   * @param expression
+   * @param inputSplit
+   */
+  private void checkAndAddImplicitExpression(Expression expression, InputSplit inputSplit) {
+    if (inputSplit instanceof CarbonMultiBlockSplit) {
+      CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit;
+      List<CarbonInputSplit> splits = split.getAllSplits();
+      // iterate over all the splits and create block to bblocklet mapping
+      Map<String, Set<Integer>> blockIdToBlockletIdMapping = new HashMap<>();
+      for (CarbonInputSplit carbonInputSplit : splits) {
+        Set<Integer> validBlockletIds = carbonInputSplit.getValidBlockletIds();
+        if (null != validBlockletIds && !validBlockletIds.isEmpty()) {
+          String uniqueBlockPath = carbonInputSplit.getPath().toString();
+          String shortBlockPath = CarbonTablePath
+              .getShortBlockId(uniqueBlockPath.substring(uniqueBlockPath.lastIndexOf("/Part") + 1));
+          blockIdToBlockletIdMapping.put(shortBlockPath, validBlockletIds);
+        }
+      }
+      if (!blockIdToBlockletIdMapping.isEmpty()) {
+        // create implicit expression and set as right child
+        FilterUtil
+            .createImplicitExpressionAndSetAsRightChild(expression, blockIdToBlockletIdMapping);
+      }
+    }
+  }
+
   public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) {
     String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
     //By default it uses dictionary decoder read class
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/TestImplicitFilterExpression.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/TestImplicitFilterExpression.scala
new file mode 100644 (file)
index 0000000..e28eaad
--- /dev/null
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.filterexpr
+
+import java.util
+
+import org.apache.spark.sql.{CarbonEnv, DataFrame, Row}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.scan.expression.logical.{AndExpression, TrueExpression}
+import org.apache.carbondata.core.scan.filter.FilterUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+
+/**
+ * test class to verify the functionality of Implicit filter expression
+ */
+class TestImplicitFilterExpression extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll(): Unit = {
+    sql("drop table if exists implicit_test")
+    sql(
+      "create table implicit_test(firstname string, lastname string, age int) stored by " +
+      "'carbondata'")
+    sql("insert into implicit_test select 'bob','marshall',35")
+  }
+
+  test("test implicit filter expression for data pruning with valid implicit filter value") {
+    val query: DataFrame = sql("select count(*) from implicit_test where lastname='marshall'")
+    // 1 row should be returned for blockletId 0
+    verifyResultWithImplicitFilter(query, 1, 0)
+  }
+
+  test("test implicit filter expression for data pruning with invalid implicit filter value") {
+    val query: DataFrame = sql("select count(*) from implicit_test where lastname='marshall'")
+    // No row should be returned for blockletId 1
+    verifyResultWithImplicitFilter(query, 0, 1)
+  }
+
+  private def verifyResultWithImplicitFilter(query: DataFrame,
+      expectedResultCount: Int,
+      blockletId: Int): Unit = {
+    // from the plan extract the CarbonScanRDD
+    val scanRDD = query.queryExecution.sparkPlan.collect {
+      case scan: CarbonDataSourceScan if (scan.rdd.isInstanceOf[CarbonScanRDD[InternalRow]]) =>
+        scan.rdd.asInstanceOf[CarbonScanRDD[InternalRow]]
+    }.head
+    // get carbon relation
+    val relation: CarbonRelation = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetaStore
+      .lookupRelation(Some("default"), "implicit_test")(sqlContext.sparkSession)
+      .asInstanceOf[CarbonRelation]
+    // get carbon table from carbon relation
+    val carbonTable = relation.carbonTable
+    // get the segment path
+    val segmentPath = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, "0")
+    // list carbondata files from the segment path
+    val files = FileFactory.getCarbonFile(segmentPath).listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = {
+        if (file.getName.endsWith(CarbonTablePath.getCarbonDataExtension)) {
+          true
+        } else {
+          false
+        }
+      }
+    })
+    // assert that only 1 carbondata file exists
+    assert(files.length == 1)
+    // get the carbondata file complete path and name
+    val carbondataFileName = FileFactory.getUpdatedFilePath(files.head.getPath)
+    // get the shourt unique name for the carbondata file
+    // Example: complete file name will be as below
+    // /opt/db/implicit_test/Fact/Part0/Segment_0/part-0-0_batchno0-0-0-1545986389020.carbondata
+    // short file name: 0/0/0-0_batchno0-0-0-1545986389020
+    val carbondataFileShortName = CarbonTablePath
+      .getShortBlockId(carbondataFileName.substring(carbondataFileName.lastIndexOf("/Part") + 1))
+    // create block to blocklet mapping indicating which all blocklets for a given block
+    // contain the data
+    val blockToBlockletMap = new util.HashMap[String, util.Set[Integer]]()
+    val blockletList = new util.HashSet[Integer]()
+    // add blocklet Id 0 to the list
+    blockletList.add(blockletId)
+    blockToBlockletMap.put(carbondataFileShortName, blockletList)
+    // create a new AND expression with True expression as right child
+    val filterExpression = new AndExpression(scanRDD.filterExpression, new TrueExpression(null))
+    // create implicit expression which will replace the right child (True expression)
+    FilterUtil.createImplicitExpressionAndSetAsRightChild(filterExpression, blockToBlockletMap)
+    // update the filter expression
+    scanRDD.filterExpression = filterExpression
+    // execute the query and get the result count
+    checkAnswer(query.toDF(), Seq(Row(expectedResultCount)))
+  }
+
+  override def afterAll(): Unit = {
+    sql("drop table if exists implicit_test")
+  }
+
+}
index 28049b5..a32a8de 100644 (file)
@@ -50,11 +50,13 @@ import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.scan.expression.conditional.ImplicitExpression
 import org.apache.carbondata.core.scan.filter.FilterUtil
 import org.apache.carbondata.core.scan.model.QueryModel
 import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants}
 import org.apache.carbondata.core.statusmanager.FileFormat
 import org.apache.carbondata.core.util._
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop._
 import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat}
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
@@ -416,7 +418,7 @@ class CarbonScanRDD[T: ClassTag](
     TaskMetricsMap.getInstance().registerThreadCallback()
     inputMetricsStats.initBytesReadCallback(context, inputSplit)
     val iterator = if (inputSplit.getAllSplits.size() > 0) {
-      val model = format.createQueryModel(inputSplit, attemptContext)
+      val model = format.createQueryModel(inputSplit, attemptContext, filterExpression)
       // one query id per table
       model.setQueryId(queryId)
       // get RecordReader by FileFormat
@@ -687,6 +689,33 @@ class CarbonScanRDD[T: ClassTag](
       if (identifiedPartitions.nonEmpty &&
           !checkForBlockWithoutBlockletInfo(identifiedPartitions)) {
         FilterUtil.removeInExpressionNodeWithPositionIdColumn(filterExpression)
+      } else if (identifiedPartitions.nonEmpty) {
+        // the below piece of code will serialize only the required blocklet ids
+        val filterValues = FilterUtil.getImplicitFilterExpression(filterExpression)
+        if (null != filterValues) {
+          val implicitExpression = filterValues.asInstanceOf[ImplicitExpression]
+          identifiedPartitions.foreach { partition =>
+            // for each partition get the list if input split
+            val inputSplit = partition.asInstanceOf[CarbonSparkPartition].split.value
+            val splitList = if (inputSplit.isInstanceOf[CarbonMultiBlockSplit]) {
+              inputSplit.asInstanceOf[CarbonMultiBlockSplit].getAllSplits
+            } else {
+              new java.util.ArrayList().add(inputSplit.asInstanceOf[CarbonInputSplit])
+            }.asInstanceOf[java.util.List[CarbonInputSplit]]
+            // for each split and given block path set all the valid blocklet ids
+            splitList.asScala.map { split =>
+              val uniqueBlockPath = split.getPath.toString
+              val shortBlockPath = CarbonTablePath
+                .getShortBlockId(uniqueBlockPath
+                  .substring(uniqueBlockPath.lastIndexOf("/Part") + 1))
+              val blockletIds = implicitExpression.getBlockIdToBlockletIdMapping.get(shortBlockPath)
+              split.setValidBlockletIds(blockletIds)
+            }
+          }
+          // remove the right child of the expression here to prevent serialization of
+          // implicit filter values to executor
+          FilterUtil.setTrueExpressionAsRightChild(filterExpression)
+        }
       }
     }
   }
index b96b6a7..9835938 100644 (file)
@@ -350,9 +350,18 @@ object CarbonFilters {
           )
         }
       case In(left, right) if (isCarbonSupportedDataTypes(left)) =>
-        new InExpression(transformExpression(left),
-          new ListExpression(convertToJavaList(right.filter(_ != null).filter(!isNullLiteral(_))
-            .map(transformExpression))))
+        left match {
+          case left: AttributeReference if (left.name
+            .equalsIgnoreCase(CarbonCommonConstants.POSITION_ID)) =>
+            new InExpression(transformExpression(left),
+              new ImplicitExpression(convertToJavaList(right.filter(_ != null)
+                .filter(!isNullLiteral(_))
+                .map(transformExpression))))
+          case _ =>
+            new InExpression(transformExpression(left),
+              new ListExpression(convertToJavaList(right.filter(_ != null).filter(!isNullLiteral(_))
+                .map(transformExpression))))
+        }
       case InSet(left, right) if (isCarbonSupportedDataTypes(left)) =>
         val validData = right.filter(_ != null).map { x =>
           val e = Literal(x.toString)