Fix filtering manifests in unpartitioned tables. (#72)
authorRyan Blue <rdblue@users.noreply.github.com>
Mon, 14 Jan 2019 18:54:57 +0000 (10:54 -0800)
committerGitHub <noreply@github.com>
Mon, 14 Jan 2019 18:54:57 +0000 (10:54 -0800)
FilteredManifest only ran filters if there was a row filter AND a
partition filter, but it should run filteres if there is a row filter OR
a partition filter.

Because a filter may be null, this also updates the functions that
create evaluators to create an evaluator for alwaysTrue when the
expression is null.

core/src/main/java/com/netflix/iceberg/FilteredManifest.java
core/src/test/java/com/netflix/iceberg/TestFilterFiles.java [new file with mode: 0644]

index e73df8d..bea656c 100644 (file)
@@ -73,8 +73,8 @@ public class FilteredManifest implements Filterable<FilteredManifest> {
   }
 
   Iterable<ManifestEntry> allEntries() {
-    if (rowFilter != null && rowFilter != Expressions.alwaysTrue() &&
-        partFilter != null && partFilter != Expressions.alwaysTrue()) {
+    if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) ||
+        (partFilter != null && partFilter != Expressions.alwaysTrue())) {
       Evaluator evaluator = evaluator();
       InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();
 
@@ -89,8 +89,8 @@ public class FilteredManifest implements Filterable<FilteredManifest> {
   }
 
   Iterable<ManifestEntry> liveEntries() {
-    if (rowFilter != null && rowFilter != Expressions.alwaysTrue() &&
-        partFilter != null && partFilter != Expressions.alwaysTrue()) {
+    if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) ||
+        (partFilter != null && partFilter != Expressions.alwaysTrue())) {
       Evaluator evaluator = evaluator();
       InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();
 
@@ -108,8 +108,8 @@ public class FilteredManifest implements Filterable<FilteredManifest> {
 
   @Override
   public Iterator<DataFile> iterator() {
-    if (rowFilter != null && rowFilter != Expressions.alwaysTrue() &&
-        partFilter != null && partFilter != Expressions.alwaysTrue()) {
+    if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) ||
+        (partFilter != null && partFilter != Expressions.alwaysTrue())) {
       Evaluator evaluator = evaluator();
       InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();
 
@@ -127,14 +127,24 @@ public class FilteredManifest implements Filterable<FilteredManifest> {
 
   private Evaluator evaluator() {
     if (lazyEvaluator == null) {
-      this.lazyEvaluator = new Evaluator(reader.spec().partitionType(), partFilter);
+      if (partFilter != null) {
+        this.lazyEvaluator = new Evaluator(reader.spec().partitionType(), partFilter);
+      } else {
+        this.lazyEvaluator = new Evaluator(reader.spec().partitionType(), Expressions.alwaysTrue());
+      }
     }
     return lazyEvaluator;
   }
 
   private InclusiveMetricsEvaluator metricsEvaluator() {
     if (lazyMetricsEvaluator == null) {
-      this.lazyMetricsEvaluator = new InclusiveMetricsEvaluator(reader.spec().schema(), rowFilter);
+      if (rowFilter != null) {
+        this.lazyMetricsEvaluator = new InclusiveMetricsEvaluator(
+            reader.spec().schema(), rowFilter);
+      } else {
+        this.lazyMetricsEvaluator = new InclusiveMetricsEvaluator(
+            reader.spec().schema(), Expressions.alwaysTrue());
+      }
     }
     return lazyMetricsEvaluator;
   }
diff --git a/core/src/test/java/com/netflix/iceberg/TestFilterFiles.java b/core/src/test/java/com/netflix/iceberg/TestFilterFiles.java
new file mode 100644 (file)
index 0000000..cafff00
--- /dev/null
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.netflix.iceberg.expressions.Expressions;
+import com.netflix.iceberg.types.Conversions;
+import com.netflix.iceberg.types.Types;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static com.netflix.iceberg.types.Types.NestedField.required;
+import static org.junit.Assert.assertEquals;
+
+public class TestFilterFiles {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private final Schema schema = new Schema(
+    required(1, "id", Types.IntegerType.get()),
+    required(2, "data", Types.StringType.get())
+  );
+  private File tableDir = null;
+
+  @Before
+  public void setupTableDir() throws IOException {
+    this.tableDir = temp.newFolder();
+  }
+
+  @After
+  public void cleanupTables() {
+    TestTables.clearTables();
+  }
+
+  @Test
+  public void testFilterFilesUnpartitionedTable() {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Table table = TestTables.create(tableDir, "test", schema, spec);
+    testFilterFiles(table);
+  }
+
+  @Test
+  public void testFilterFilesPartitionedTable() {
+    PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("data", 16).build();
+    Table table = TestTables.create(tableDir, "test", schema, spec);
+    testFilterFiles(table);
+  }
+
+  private void testFilterFiles(Table table) {
+    Map<Integer, ByteBuffer> lowerBounds = new HashMap<>();
+    Map<Integer, ByteBuffer> upperBounds = new HashMap<>();
+    lowerBounds.put(1, Conversions.toByteBuffer(Types.IntegerType.get(), 1));
+    upperBounds.put(1, Conversions.toByteBuffer(Types.IntegerType.get(), 2));
+
+    Metrics metrics = new Metrics(2L, Maps.newHashMap(), Maps.newHashMap(),
+      Maps.newHashMap(), lowerBounds, upperBounds);
+
+    DataFile file = DataFiles.builder(table.spec())
+      .withPath("/path/to/file.parquet")
+      .withFileSizeInBytes(0)
+      .withMetrics(metrics)
+      .build();
+
+    table.newAppend().appendFile(file).commit();
+
+    table.refresh();
+
+    TableScan emptyScan = table.newScan().filter(Expressions.equal("id", 5));
+    assertEquals(0, Iterables.size(emptyScan.planFiles()));
+
+    TableScan nonEmptyScan = table.newScan().filter(Expressions.equal("id", 1));
+    assertEquals(1, Iterables.size(nonEmptyScan.planFiles()));
+  }
+}