[CARBONDATA-4317] Fix TPCDS performance issues
[carbondata.git] / integration / spark / src / main / scala / org / apache / spark / sql / execution / strategy / CarbonDataSourceScan.scala
index 2e1bb9668d1c2737ffb19e9364c8f9cba3cab027..31685b08b1a064842e60c78d65b9f829c56f4b64 100644 (file)
@@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition
 import org.apache.spark.sql.execution.WholeStageCodegenExec
 
 import org.apache.carbondata.core.metadata.schema.BucketingInfo
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.hadoop.CarbonProjection
 
@@ -44,7 +43,6 @@ case class CarbonDataSourceScan(
     output: Seq[Attribute],
     partitionFiltersWithoutDpp: Seq[SparkExpression],
     dataFilters: Seq[SparkExpression],
-    @transient readCommittedScope: ReadCommittedScope,
     @transient pushedDownProjection: CarbonProjection,
     @transient pushedDownFilters: Seq[Expression],
     directScanSupport: Boolean,
@@ -64,6 +62,10 @@ case class CarbonDataSourceScan(
     partitionFiltersWithDpp,
     segmentIds) {
 
+  val pushDownFiltersStr: String = seqToString(pushedDownFilters.map(_.getStatement))
+
+  val projectionColStr: String = seqToString(pushedDownProjection.getAllColumns)
+
   override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
     val info: BucketingInfo = relation.carbonTable.getBucketingInfo
     if (info != null) {
@@ -91,15 +93,18 @@ case class CarbonDataSourceScan(
     }
   }
 
+  def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]")
+
   override lazy val metadata: Map[String, String] = {
-    def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
     val metadata =
       Map(
-        "ReadSchema" -> seqToString(pushedDownProjection.getAllColumns),
+        "ReadSchema" -> projectionColStr,
         "Batched" -> supportsBatchOrColumnar.toString,
         "DirectScan" -> (supportsBatchOrColumnar && directScanSupport).toString,
-        "PushedFilters" -> seqToString(pushedDownFilters.map(_.getStatement)))
-    if (relation.carbonTable.isHivePartitionTable) {
+        "PushedFilters" -> pushDownFiltersStr)
+    // if plan is canonicalized, then filter expressions will be normalized. In that case,
+    // skip adding selected partitions to metadata
+    if (!this.isCanonicalizedPlan && relation.carbonTable.isHivePartitionTable) {
       metadata + ("PartitionFilters" -> seqToString(partitionFiltersWithDpp)) +
         ("PartitionCount" -> selectedPartitions.size.toString)
     } else {
@@ -142,14 +147,40 @@ case class CarbonDataSourceScan(
       outputAttibutesAfterNormalizingExpressionIds,
       QueryPlan.normalizePredicates(partitionFiltersWithoutDpp, output),
       QueryPlan.normalizePredicates(dataFilters, output),
-      null,
-      null,
+      pushedDownProjection,
       Seq.empty,
       directScanSupport,
       extraRDD,
       tableIdentifier,
-      selectedCatalogPartitions,
+      Seq.empty,
       QueryPlan.normalizePredicates(partitionFiltersWithDpp, output)
     )
   }
+
+  override def equals(other: Any): Boolean = {
+    other match {
+      case scan: CarbonDataSourceScan =>
+        if (scan.relation == relation) {
+          var currentPlan = this
+          var otherPlan = scan
+          // In some cases, the plans for comparison is not canonicalized. In that case, comparing
+          // pushedDownFilters will not match, since objects are different. Do canonicalize
+          // the plans before comparison, which can reuse exchange for better performance
+          if (pushedDownFilters.nonEmpty && scan.pushedDownFilters.nonEmpty) {
+            otherPlan = scan.canonicalized.asInstanceOf[CarbonDataSourceScan]
+            currentPlan = this.canonicalized.asInstanceOf[CarbonDataSourceScan]
+          }
+          // compare metadata, partition filter and data filter expressions
+          currentPlan.metadata == otherPlan.metadata &&
+          currentPlan.partitionFiltersWithDpp.toList.asJava
+            .containsAll(otherPlan.partitionFiltersWithDpp.toList.asJava) &&
+          (currentPlan.dataFilters == otherPlan.dataFilters ||
+           QueryPlan.normalizePredicates(currentPlan.dataFilters, currentPlan.output)
+           == QueryPlan.normalizePredicates(otherPlan.dataFilters, otherPlan.output))
+        } else {
+          false
+        }
+      case _ => false
+    }
+  }
 }