DRILL-8190: Fix mongo project pushdown for queries with joins (#2652) master
authorVolodymyr Vysotskyi <vvovyk@gmail.com>
Mon, 26 Sep 2022 06:27:27 +0000 (09:27 +0300)
committerGitHub <noreply@github.com>
Mon, 26 Sep 2022 06:27:27 +0000 (08:27 +0200)
18 files changed:
contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcIntermediatePrelConverterRule.java
contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/plan/MongoPluginImplementor.java
contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoProjectPushDown.java
contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/rules/PhoenixIntermediatePrelConverterRule.java
exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java
exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdMaxRowCount.java
exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdRowCount.java
exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdSelectivity.java
exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/ColumnConverterFactoryProvider.java
exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/DynamicTypeResolverBuilder.java [new file with mode: 0644]
exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableBatchCreator.java
exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/EnumerableSubScan.java
exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/EnumerableIntermediatePrelConverterRule.java
exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/plan/VertexDrel.java
exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/PluginAggregateRel.java
exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rel/StoragePluginTableScan.java
exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginIntermediatePrelConverterRule.java
exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java

index 8995aab6d49329025d91ce4934bb84a18be1ca8b..62250b4662d1dde531b6e3d53952a163b9f2874a 100644 (file)
@@ -25,6 +25,7 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.store.enumerable.plan.VertexDrel;
 
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.store.enumerable.plan.VertexDrel;
 
@@ -50,7 +51,7 @@ class JdbcIntermediatePrelConverterRule extends RelOptRule {
     VertexDrel in = call.rel(0);
     RelNode jdbcIntermediatePrel = new JdbcIntermediatePrel(
         in.getCluster(),
     VertexDrel in = call.rel(0);
     RelNode jdbcIntermediatePrel = new JdbcIntermediatePrel(
         in.getCluster(),
-        in.getTraitSet().replace(outTrait),
+        in.getTraitSet().replace(outTrait).plus(DrillDistributionTrait.SINGLETON),
         in.getInput(0), username);
     call.transformTo(jdbcIntermediatePrel);
   }
         in.getInput(0), username);
     call.transformTo(jdbcIntermediatePrel);
   }
index b55fa6b1730ab9d5ce50347b7ca4a6246bf772f5..774a23265bf148eb2437629210920c994ffd8449 100644 (file)
@@ -55,7 +55,6 @@ import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
 import org.apache.drill.exec.store.plan.rel.PluginSortRel;
 import org.apache.drill.exec.store.plan.rel.PluginUnionRel;
 import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan;
 import org.apache.drill.exec.store.plan.rel.PluginSortRel;
 import org.apache.drill.exec.store.plan.rel.PluginUnionRel;
 import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan;
-import org.apache.drill.exec.util.Utilities;
 import org.bson.BsonDocument;
 import org.bson.BsonElement;
 import org.bson.BsonInt32;
 import org.bson.BsonDocument;
 import org.bson.BsonElement;
 import org.bson.BsonInt32;
@@ -220,8 +219,8 @@ public class MongoPluginImplementor extends AbstractPluginImplementor {
   }
 
   @Override
   }
 
   @Override
-  public void implement(StoragePluginTableScan scan) throws IOException {
-    groupScan = (MongoGroupScan) Utilities.getDrillTable(scan.getTable()).getGroupScan();
+  public void implement(StoragePluginTableScan scan) {
+    groupScan = (MongoGroupScan) scan.getGroupScan();
     operations = this.groupScan.getScanSpec().getOperations().stream()
       .map(BsonDocument::parse)
       .collect(Collectors.toList());
     operations = this.groupScan.getScanSpec().getOperations().stream()
       .map(BsonDocument::parse)
       .collect(Collectors.toList());
index a691443311e1ec2fca9e6909483de46c95b24fe5..372ec6d5e4e1694e0d8c9c59999ae1ef034cf950 100644 (file)
@@ -101,4 +101,26 @@ public class TestMongoProjectPushDown extends MongoTestBase {
         .go();
   }
 
         .go();
   }
 
+  @Test // DRILL-8190
+  public void testProjectWithJoin() throws Exception {
+    String query = "SELECT sum(s1.sales) s1_sales,\n" +
+      "sum(s2.sales) s2_sales\n" +
+      "FROM mongo.%s.`%s` s1\n" +
+      "JOIN mongo.%s.`%s` s2 ON s1._id = s2._id";
+
+    queryBuilder()
+      .sql(query, DONUTS_DB, DONUTS_COLLECTION, DONUTS_DB, DONUTS_COLLECTION)
+      .planMatcher()
+      .include("columns=\\[`_id`, `sales`]")
+      .exclude("columns=\\[`\\*\\*`")
+      .match();
+
+    testBuilder()
+      .sqlQuery(query, DONUTS_DB, DONUTS_COLLECTION, DONUTS_DB, DONUTS_COLLECTION)
+      .unOrdered()
+      .baselineColumns("s1_sales", "s2_sales")
+      .baselineValues(1194L, 1194L)
+      .go();
+  }
+
 }
 }
index c5eaaf1ddae09a34426d3de1a5975517ffd1abdb..7d6a88d0cd24b8380bd6c3f2fac2cdd21441217d 100644 (file)
@@ -25,6 +25,7 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.store.enumerable.plan.VertexDrel;
 
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.store.enumerable.plan.VertexDrel;
 
@@ -48,7 +49,7 @@ final class PhoenixIntermediatePrelConverterRule extends RelOptRule {
     VertexDrel in = call.rel(0);
     RelNode intermediatePrel = new PhoenixIntermediatePrel(
       in.getCluster(),
     VertexDrel in = call.rel(0);
     RelNode intermediatePrel = new PhoenixIntermediatePrel(
       in.getCluster(),
-      in.getTraitSet().replace(outTrait),
+      in.getTraitSet().replace(outTrait).plus(DrillDistributionTrait.SINGLETON),
       in.getInput(0));
     call.transformTo(intermediatePrel);
   }
       in.getInput(0));
     call.transformTo(intermediatePrel);
   }
index 25c2fe2c39dc26ff30f61e43a4a63c3f6f6868af..cdfeb6c594f05eca6cf8e964b17b1afb31e8af0d 100644 (file)
@@ -30,6 +30,7 @@ import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.core.Window;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.metadata.BuiltInMetadata;
 import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMdDistinctRowCount;
 import org.apache.calcite.rel.metadata.RelMdUtil;
 import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMdDistinctRowCount;
 import org.apache.calcite.rel.metadata.RelMdUtil;
@@ -42,7 +43,6 @@ import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.util.BuiltInMethod;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.planner.common.DrillJoinRelBase;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.planner.common.DrillJoinRelBase;
@@ -61,12 +61,9 @@ import org.slf4j.LoggerFactory;
 public class DrillRelMdDistinctRowCount extends RelMdDistinctRowCount{
   private static final Logger logger = LoggerFactory.getLogger(DrillRelMdDistinctRowCount.class);
 
 public class DrillRelMdDistinctRowCount extends RelMdDistinctRowCount{
   private static final Logger logger = LoggerFactory.getLogger(DrillRelMdDistinctRowCount.class);
 
-  private static final DrillRelMdDistinctRowCount INSTANCE =
-      new DrillRelMdDistinctRowCount();
-
   public static final RelMetadataProvider SOURCE =
   public static final RelMetadataProvider SOURCE =
-      ReflectiveRelMetadataProvider.reflectiveSource(
-          BuiltInMethod.DISTINCT_ROW_COUNT.method, INSTANCE);
+    ReflectiveRelMetadataProvider.reflectiveSource(
+      new DrillRelMdDistinctRowCount(), BuiltInMetadata.DistinctRowCount.Handler.class);
 
   /**
    * We need to override this method since Calcite and Drill calculate
 
   /**
    * We need to override this method since Calcite and Drill calculate
index ed96025189f994e7fa387731cf1118ef1291dd56..09e555c9ff67231cca80f6c4c8586c9cc28c1bba 100644 (file)
 package org.apache.drill.exec.planner.cost;
 
 import org.apache.calcite.rel.core.Aggregate;
 package org.apache.drill.exec.planner.cost;
 
 import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.metadata.BuiltInMetadata;
 import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMdMaxRowCount;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMdMaxRowCount;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.util.BuiltInMethod;
 
 public class DrillRelMdMaxRowCount extends RelMdMaxRowCount {
 
 
 public class DrillRelMdMaxRowCount extends RelMdMaxRowCount {
 
-  private static final DrillRelMdMaxRowCount INSTANCE = new DrillRelMdMaxRowCount();
-
   public static final RelMetadataProvider SOURCE =
   public static final RelMetadataProvider SOURCE =
-      ReflectiveRelMetadataProvider.reflectiveSource(BuiltInMethod.MAX_ROW_COUNT.method, INSTANCE);
+    ReflectiveRelMetadataProvider.reflectiveSource(
+      new DrillRelMdMaxRowCount(), BuiltInMetadata.MaxRowCount.Handler.class);
 
   // The method is overriden because of changes done in CALCITE-2991 and
   // TODO: should be discarded when CALCITE-1048 is fixed.
 
   // The method is overriden because of changes done in CALCITE-2991 and
   // TODO: should be discarded when CALCITE-1048 is fixed.
index eaaf7d1158d1b645ecf1b4a11111f7fa58947129..2f59ab30271e995ec8d6ab723b9d4148c6b8bee5 100644 (file)
@@ -23,11 +23,12 @@ import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.core.Union;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.metadata.BuiltInMetadata;
 import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMdRowCount;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMdRowCount;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.util.BuiltInMethod;
+import org.apache.calcite.schema.Table;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.drill.exec.planner.common.DrillLimitRelBase;
 import org.apache.drill.exec.planner.common.DrillRelOptUtil;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.drill.exec.planner.common.DrillLimitRelBase;
 import org.apache.drill.exec.planner.common.DrillRelOptUtil;
@@ -40,10 +41,12 @@ import org.apache.drill.exec.util.Utilities;
 import org.apache.drill.metastore.statistics.TableStatisticsKind;
 
 
 import org.apache.drill.metastore.statistics.TableStatisticsKind;
 
 
-public class DrillRelMdRowCount extends RelMdRowCount{
-  private static final DrillRelMdRowCount INSTANCE = new DrillRelMdRowCount();
+public class DrillRelMdRowCount extends RelMdRowCount {
 
 
-  public static final RelMetadataProvider SOURCE = ReflectiveRelMetadataProvider.reflectiveSource(BuiltInMethod.ROW_COUNT.method, INSTANCE);
+  public static final RelMetadataProvider SOURCE = ReflectiveRelMetadataProvider.reflectiveSource(
+    new DrillRelMdRowCount(), BuiltInMetadata.RowCount.Handler.class);
+
+  private static final Double DEFAULT_SCAN_ROW_COUNT = 1e9;
 
   @Override
   public Double getRowCount(Aggregate rel, RelMetadataQuery mq) {
 
   @Override
   public Double getRowCount(Aggregate rel, RelMetadataQuery mq) {
@@ -96,7 +99,14 @@ public class DrillRelMdRowCount extends RelMdRowCount{
     PlannerSettings settings = PrelUtil.getSettings(rel.getCluster());
     // If guessing, return selectivity from RelMDRowCount
     if (DrillRelOptUtil.guessRows(rel)) {
     PlannerSettings settings = PrelUtil.getSettings(rel.getCluster());
     // If guessing, return selectivity from RelMDRowCount
     if (DrillRelOptUtil.guessRows(rel)) {
-      return super.getRowCount(rel, mq);
+      if (rel instanceof DrillScanRelBase
+        || rel.getTable().unwrap(Table.class).getStatistic().getRowCount() != null) {
+        return super.getRowCount(rel, mq);
+      } else {
+        // if table doesn't have row count statistics, return large row count
+        // to make sure that limit will be pushed down
+        return DEFAULT_SCAN_ROW_COUNT;
+      }
     }
     // Return rowcount from statistics, if available. Otherwise, delegate to parent.
     try {
     }
     // Return rowcount from statistics, if available. Otherwise, delegate to parent.
     try {
index 5732f910028f4df22a53c16615ae4305a772e4bc..bda0b16d0b65759b0e860ce65bb1f82c89922a50 100644 (file)
@@ -33,6 +33,7 @@ import org.apache.calcite.plan.volcano.RelSubset;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.metadata.BuiltInMetadata;
 import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMdSelectivity;
 import org.apache.calcite.rel.metadata.RelMdUtil;
 import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMdSelectivity;
 import org.apache.calcite.rel.metadata.RelMdUtil;
@@ -46,7 +47,6 @@ import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.rex.RexVisitor;
 import org.apache.calcite.rex.RexVisitorImpl;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.rex.RexVisitor;
 import org.apache.calcite.rex.RexVisitorImpl;
 import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.util.BuiltInMethod;
 import org.apache.calcite.util.Util;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.base.DbGroupScan;
 import org.apache.calcite.util.Util;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.base.DbGroupScan;
@@ -71,8 +71,8 @@ import org.slf4j.LoggerFactory;
 public class DrillRelMdSelectivity extends RelMdSelectivity {
   private static final Logger logger = LoggerFactory.getLogger(DrillRelMdSelectivity.class);
 
 public class DrillRelMdSelectivity extends RelMdSelectivity {
   private static final Logger logger = LoggerFactory.getLogger(DrillRelMdSelectivity.class);
 
-  private static final DrillRelMdSelectivity INSTANCE = new DrillRelMdSelectivity();
-  public static final RelMetadataProvider SOURCE = ReflectiveRelMetadataProvider.reflectiveSource(BuiltInMethod.SELECTIVITY.method, INSTANCE);
+  public static final RelMetadataProvider SOURCE = ReflectiveRelMetadataProvider.reflectiveSource(
+    new DrillRelMdSelectivity(), BuiltInMetadata.Selectivity.Handler.class);
   /*
    * For now, we are treating all LIKE predicates to have the same selectivity irrespective of the number or position
    * of wildcard characters (%). This is no different than the present Drill/Calcite behaviour w.r.t to LIKE predicates.
   /*
    * For now, we are treating all LIKE predicates to have the same selectivity irrespective of the number or position
    * of wildcard characters (%). This is no different than the present Drill/Calcite behaviour w.r.t to LIKE predicates.
index 28aec71d0f20540ad10e8ff6b5365fca50a638dc..b91e4cbb13345bfe324028d84aa1b39ed34d29c5 100644 (file)
 package org.apache.drill.exec.store.enumerable;
 
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 package org.apache.drill.exec.store.enumerable;
 
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.annotation.JsonTypeResolver;
 import org.apache.drill.exec.record.ColumnConverterFactory;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME)
 import org.apache.drill.exec.record.ColumnConverterFactory;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME)
+@JsonTypeResolver(DynamicTypeResolverBuilder.class)
 public interface ColumnConverterFactoryProvider {
 
   ColumnConverterFactory getFactory(TupleMetadata schema);
 public interface ColumnConverterFactoryProvider {
 
   ColumnConverterFactory getFactory(TupleMetadata schema);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/DynamicTypeResolverBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/enumerable/DynamicTypeResolverBuilder.java
new file mode 100644 (file)
index 0000000..64fa986
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.enumerable;
+
+import com.fasterxml.jackson.databind.DeserializationConfig;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.jsontype.TypeDeserializer;
+import com.fasterxml.jackson.databind.jsontype.impl.StdTypeResolverBuilder;
+import org.reflections.Reflections;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class DynamicTypeResolverBuilder extends StdTypeResolverBuilder {
+
+  @Override
+  public TypeDeserializer buildTypeDeserializer(DeserializationConfig config,
+    JavaType baseType, Collection<NamedType> subtypes) {
+
+    Reflections reflections = new Reflections("org.apache.drill.exec.store");
+    @SuppressWarnings("unchecked")
+    Class<Object> rawClass = (Class<Object>) baseType.getRawClass();
+    List<NamedType> dynamicSubtypes = reflections.getSubTypesOf(rawClass).stream()
+      .map(NamedType::new)
+      .collect(Collectors.toList());
+    dynamicSubtypes.addAll(subtypes);
+
+    return super.buildTypeDeserializer(config, baseType, dynamicSubtypes);
+  }
+}
index 2dec45a61aaf86fd93b1f717b11ecaf84a2881cb..9930484a2b146fd8d733a5395b9501fdc5f3be6f 100644 (file)
@@ -61,7 +61,7 @@ public class EnumerableBatchCreator implements BatchCreator<EnumerableSubScan> {
     builder.providedSchema(subScan.getSchema());
 
     ManagedReader<SchemaNegotiator> reader = new EnumerableRecordReader(subScan.getColumns(),
     builder.providedSchema(subScan.getSchema());
 
     ManagedReader<SchemaNegotiator> reader = new EnumerableRecordReader(subScan.getColumns(),
-        subScan.getFieldsMap(), subScan.getCode(), subScan.getSchemaPath(), subScan.factoryProvider());
+        subScan.getFieldsMap(), subScan.getCode(), subScan.getSchemaPath(), subScan.getConverterFactoryProvider());
     ManagedScanFramework.ReaderFactory readerFactory = new BasicScanFactory(Collections.singletonList(reader).iterator());
     builder.setReaderFactory(readerFactory);
     builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
     ManagedScanFramework.ReaderFactory readerFactory = new BasicScanFactory(Collections.singletonList(reader).iterator());
     builder.setReaderFactory(readerFactory);
     builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
index 4476be8c53803a5705fc9a3c198a90c5d75f62ec..0c7245b6ccca6ac1b8c1314623d3b5bb0895ce16 100644 (file)
@@ -79,7 +79,7 @@ public class EnumerableSubScan extends AbstractSubScan {
     return schemaPath;
   }
 
     return schemaPath;
   }
 
-  public ColumnConverterFactoryProvider factoryProvider() {
+  public ColumnConverterFactoryProvider getConverterFactoryProvider() {
     return converterFactoryProvider;
   }
 }
     return converterFactoryProvider;
   }
 }
index 7272a36bf636bfbfac5916be12b2f3ba9780aaf6..c5ede3a22e869dd0210d21368b00fc61a435a965 100644 (file)
@@ -25,6 +25,7 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
 import org.apache.drill.exec.planner.physical.Prel;
 
 public class EnumerableIntermediatePrelConverterRule extends RelOptRule {
 import org.apache.drill.exec.planner.physical.Prel;
 
 public class EnumerableIntermediatePrelConverterRule extends RelOptRule {
@@ -48,7 +49,7 @@ public class EnumerableIntermediatePrelConverterRule extends RelOptRule {
     VertexDrel in = call.rel(0);
     RelNode intermediatePrel = new EnumerableIntermediatePrel(
         in.getCluster(),
     VertexDrel in = call.rel(0);
     RelNode intermediatePrel = new EnumerableIntermediatePrel(
         in.getCluster(),
-        in.getTraitSet().replace(outTrait),
+        in.getTraitSet().replace(outTrait).plus(DrillDistributionTrait.SINGLETON),
         in.getInput(0),
         context);
     call.transformTo(intermediatePrel);
         in.getInput(0),
         context);
     call.transformTo(intermediatePrel);
index edbc5912fbffa93652191e9685ad88984ec74659..d202ebdb8938e710058d616d586001ec6b788c42 100644 (file)
 package org.apache.drill.exec.store.enumerable.plan;
 
 import org.apache.calcite.plan.RelOptCluster;
 package org.apache.drill.exec.store.enumerable.plan;
 
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.SingleRel;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.drill.common.logical.data.LogicalOperator;
 import org.apache.drill.exec.planner.logical.DrillImplementor;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.common.logical.data.LogicalOperator;
 import org.apache.drill.exec.planner.logical.DrillImplementor;
 import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.util.Utilities;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 import java.util.List;
 
 
 import java.util.List;
 
+import static org.apache.drill.exec.planner.logical.DrillScanRel.STAR_COLUMN_COST;
+
 /**
  * The vertex simply holds the child nodes but contains its own traits.
  * Used for completing Drill logical planning when child nodes have some specific traits.
 /**
  * The vertex simply holds the child nodes but contains its own traits.
  * Used for completing Drill logical planning when child nodes have some specific traits.
@@ -51,4 +58,15 @@ public class VertexDrel extends SingleRel implements DrillRel {
   public LogicalOperator implement(DrillImplementor implementor) {
     throw new UnsupportedOperationException();
   }
   public LogicalOperator implement(DrillImplementor implementor) {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public @Nullable RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    double rowCount = estimateRowCount(mq);
+    double columnCount = Utilities.isStarQuery(getRowType()) ? STAR_COLUMN_COST : getRowType().getFieldCount();
+    double valueCount = rowCount * columnCount;
+    // columns count is considered during cost calculation to make preferable plans
+    // with pushed plugin project operators since in the opposite case planner wouldn't consider
+    // a plan with additional plugin projection that reduces columns as better than a plan without it
+    return planner.getCostFactory().makeCost(rowCount, valueCount, 0).multiplyBy(0.1);
+  }
 }
 }
index 02885e90abce2c753424abf1a9802035caefc29f..3c7f115843c7811ddc9289a87d8ee3cb7cc730e6 100644 (file)
@@ -51,7 +51,7 @@ public class PluginAggregateRel extends DrillAggregateRelBase implements PluginR
 
   @Override
   public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
 
   @Override
   public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
-    return super.computeSelfCost(planner, mq).multiplyBy(0.1);
+    return super.computeLogicalAggCost(planner, mq).multiplyBy(0.1);
   }
 
   @Override
   }
 
   @Override
index 2be69ef7c988bef829e0ad803c90d63ebb6dcb8f..38525e2575a9f0a6810b5e7c05d2c7ed1562b367 100644 (file)
@@ -18,6 +18,8 @@
 package org.apache.drill.exec.store.plan.rel;
 
 import org.apache.calcite.plan.RelOptCluster;
 package org.apache.drill.exec.store.plan.rel;
 
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelWriter;
@@ -27,11 +29,14 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.common.DrillScanRelBase;
 import org.apache.drill.exec.store.plan.PluginImplementor;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.common.DrillScanRelBase;
 import org.apache.drill.exec.store.plan.PluginImplementor;
+import org.apache.drill.exec.util.Utilities;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
 
 
 import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static org.apache.drill.exec.planner.logical.DrillScanRel.STAR_COLUMN_COST;
+
 /**
  * Storage plugin table scan rel implementation.
  */
 /**
  * Storage plugin table scan rel implementation.
  */
@@ -75,6 +80,21 @@ public class StoragePluginTableScan extends DrillScanRelBase implements PluginRe
     return implementor.canImplement(this);
   }
 
     return implementor.canImplement(this);
   }
 
+  @Override
+  public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    List<SchemaPath> columns = groupScan.getColumns();
+    // column count should be adjusted to consider the case of projecting nested columns,
+    // such a scan should be preferable compared to the scan where root columns are projected only
+    double columnCount = Utilities.isStarQuery(columns)
+      ? STAR_COLUMN_COST
+      : Math.pow(getRowType().getFieldCount(), 2) / Math.max(columns.size(), 1);
+
+    double rowCount = estimateRowCount(mq);
+    double valueCount = rowCount * columnCount;
+
+    return planner.getCostFactory().makeCost(rowCount, valueCount, 0).multiplyBy(0.1);
+  }
+
   private static List<SchemaPath> getColumns(RelDataType rowType) {
     return rowType.getFieldList().stream()
       .map(filed -> filed.isDynamicStar()
   private static List<SchemaPath> getColumns(RelDataType rowType) {
     return rowType.getFieldList().stream()
       .map(filed -> filed.isDynamicStar()
index 279241efea3d82e8e7218f6260dd3befae9cf42f..a13dc25e3291983f1a706d8614d1152e61438653 100644 (file)
@@ -25,6 +25,7 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.store.enumerable.plan.VertexDrel;
 import org.apache.drill.exec.store.plan.PluginImplementor;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.store.enumerable.plan.VertexDrel;
 import org.apache.drill.exec.store.plan.PluginImplementor;
@@ -53,7 +54,7 @@ public class PluginIntermediatePrelConverterRule extends RelOptRule {
     VertexDrel in = call.rel(0);
     RelNode intermediatePrel = new PluginIntermediatePrel(
         in.getCluster(),
     VertexDrel in = call.rel(0);
     RelNode intermediatePrel = new PluginIntermediatePrel(
         in.getCluster(),
-        in.getTraitSet().replace(outTrait),
+        in.getTraitSet().replace(outTrait).plus(DrillDistributionTrait.SINGLETON),
         in.getInput(0),
         implementorFactory);
     call.transformTo(intermediatePrel);
         in.getInput(0),
         implementorFactory);
     call.transformTo(intermediatePrel);
index 87f220180499f5fed7d3b5e11a0629a9ee951c78..1d160d07ed4ec528398be0b3cb5392c8409ca14c 100644 (file)
@@ -21,6 +21,7 @@ import java.util.Collection;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
@@ -33,8 +34,6 @@ import org.apache.drill.exec.proto.BitControl.QueryContextInformation;
 import org.apache.drill.exec.proto.ExecProtos;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.exec.proto.ExecProtos;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.base.Predicate;
-import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
 
 public class Utilities {
 
 
 public class Utilities {
 
@@ -52,17 +51,13 @@ public class Utilities {
     int majorFragmentId = handle.getMajorFragmentId();
     int minorFragmentId = handle.getMinorFragmentId();
 
     int majorFragmentId = handle.getMajorFragmentId();
     int minorFragmentId = handle.getMinorFragmentId();
 
-    String fileName = String.format("%s//%s_%s_%s_%s", location, qid, majorFragmentId, minorFragmentId, tag);
-
-    return fileName;
+    return String.format("%s//%s_%s_%s_%s", location, qid, majorFragmentId, minorFragmentId, tag);
   }
 
   /**
    * Create {@link org.apache.drill.exec.proto.BitControl.QueryContextInformation} with given <i>defaultSchemaName</i>. Rest of the members of the
    * QueryContextInformation is derived from the current state of the process.
    *
   }
 
   /**
    * Create {@link org.apache.drill.exec.proto.BitControl.QueryContextInformation} with given <i>defaultSchemaName</i>. Rest of the members of the
    * QueryContextInformation is derived from the current state of the process.
    *
-   * @param defaultSchemaName
-   * @param sessionId
    * @return A {@link org.apache.drill.exec.proto.BitControl.QueryContextInformation} with given <i>defaultSchemaName</i>.
    */
   public static QueryContextInformation createQueryContextInfo(final String defaultSchemaName,
    * @return A {@link org.apache.drill.exec.proto.BitControl.QueryContextInformation} with given <i>defaultSchemaName</i>.
    */
   public static QueryContextInformation createQueryContextInfo(final String defaultSchemaName,
@@ -82,22 +77,25 @@ public class Utilities {
    * @return The Drill version.
    */
   public static String getDrillVersion() {
    * @return The Drill version.
    */
   public static String getDrillVersion() {
-      String v = Utilities.class.getPackage().getImplementationVersion();
-      return v;
+    return Utilities.class.getPackage().getImplementationVersion();
   }
 
   /**
    * Return true if list of schema path has star column.
   }
 
   /**
    * Return true if list of schema path has star column.
-   * @param projected
+   *
    * @return True if the list of {@link org.apache.drill.common.expression.SchemaPath}s has star column.
    */
   public static boolean isStarQuery(Collection<SchemaPath> projected) {
    * @return True if the list of {@link org.apache.drill.common.expression.SchemaPath}s has star column.
    */
   public static boolean isStarQuery(Collection<SchemaPath> projected) {
-    return Iterables.tryFind(Preconditions.checkNotNull(projected, COL_NULL_ERROR), new Predicate<SchemaPath>() {
-      @Override
-      public boolean apply(SchemaPath path) {
-        return Preconditions.checkNotNull(path).equals(SchemaPath.STAR_COLUMN);
-      }
-    }).isPresent();
+    return Preconditions.checkNotNull(projected, COL_NULL_ERROR).stream()
+      .anyMatch(SchemaPath::isDynamicStar);
+  }
+
+  /**
+   * Return true if the row type has star column.
+   */
+  public static boolean isStarQuery(RelDataType projected) {
+    return projected.getFieldNames().stream()
+      .anyMatch(SchemaPath.DYNAMIC_STAR::equals);
   }
 
   /**
   }
 
   /**