Update to Spark 2.4 (#30)
authorRyan Blue <rdblue@users.noreply.github.com>
Mon, 10 Dec 2018 17:35:15 +0000 (09:35 -0800)
committerGitHub <noreply@github.com>
Mon, 10 Dec 2018 17:35:15 +0000 (09:35 -0800)
* Update to the Spark 2.4 API.
* Remove ORC support from iceberg-spark.
* Use Filter instead of Expression.

12 files changed:
build.gradle
spark/src/main/java/com/netflix/iceberg/spark/data/SparkOrcReader.java [deleted file]
spark/src/main/java/com/netflix/iceberg/spark/data/SparkOrcWriter.java [deleted file]
spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java
spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java
spark/src/test/java/com/netflix/iceberg/spark/data/CodegenExamples.java [deleted file]
spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java
spark/src/test/java/com/netflix/iceberg/spark/source/TestFilteredScan.java
spark/src/test/java/com/netflix/iceberg/spark/source/TestOrcScan.java [deleted file]
spark/src/test/java/com/netflix/iceberg/spark/source/TestOrcWrite.java [deleted file]
spark/src/test/java/com/netflix/iceberg/spark/source/TestSparkReadProjection.java
spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java

index d41eb13..096156b 100644 (file)
@@ -75,7 +75,7 @@ subprojects {
     jacksonVersion = '2.6.7'
 
     scalaVersion = '2.11'
-    sparkVersion = '2.3.2'
+    sparkVersion = '2.4.0'
   }
 
   sourceCompatibility = '1.8'
@@ -160,8 +160,8 @@ project(':iceberg-orc') {
     compile("org.apache.orc:orc-core:$orcVersion:nohive") {
       exclude group: 'org.apache.hadoop', module: 'hadoop-common'
     }
-    
-    
+
+
     compileOnly("org.apache.hadoop:hadoop-client:$hadoopVersion") {
       exclude group: 'org.apache.avro', module: 'avro'
     }
@@ -190,7 +190,6 @@ project(':iceberg-spark') {
     compile project(':iceberg-api')
     compile project(':iceberg-common')
     compile project(':iceberg-core')
-    compile project(':iceberg-orc')
     compile project(':iceberg-parquet')
 
     compileOnly "org.apache.avro:avro:$avroVersion"
@@ -249,7 +248,6 @@ project(':iceberg-runtime') {
     shadow project(':iceberg-api')
     shadow project(':iceberg-common')
     shadow project(':iceberg-core')
-    shadow project(':iceberg-orc')
     shadow project(':iceberg-parquet')
     shadow project(':iceberg-spark')
     shadow project(':iceberg-pig')
@@ -302,7 +300,7 @@ project(':iceberg-presto-runtime') {
         shadow "org.apache.avro:avro:$avroVersion"
         shadow ("org.apache.hive:hive-standalone-metastore:$hiveVersion") {
             exclude group: 'org.apache.hadoop', module: 'hadoop-common'
-            exclude group: 'org.apache.orc', module: 'orc-core'
+//            exclude group: 'org.apache.orc', module: 'orc-core'
         }
     }
 
diff --git a/spark/src/main/java/com/netflix/iceberg/spark/data/SparkOrcReader.java b/spark/src/main/java/com/netflix/iceberg/spark/data/SparkOrcReader.java
deleted file mode 100644 (file)
index 6be855e..0000000
+++ /dev/null
@@ -1,870 +0,0 @@
-/*
- * Copyright 2018 Hortonworks
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.netflix.iceberg.spark.data;
-
-import com.netflix.iceberg.FileScanTask;
-import com.netflix.iceberg.Schema;
-import com.netflix.iceberg.io.InputFile;
-import com.netflix.iceberg.orc.ColumnIdMap;
-import com.netflix.iceberg.orc.ORC;
-import com.netflix.iceberg.orc.OrcIterator;
-import com.netflix.iceberg.orc.TypeConversion;
-import org.apache.orc.TypeDescription;
-import org.apache.orc.storage.common.type.FastHiveDecimal;
-import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
-import org.apache.orc.storage.ql.exec.vector.ColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
-import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
-import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
-import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
-import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
-import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
-import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
-import org.apache.orc.storage.serde2.io.DateWritable;
-import org.apache.orc.storage.serde2.io.HiveDecimalWritable;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
-import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
-import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter;
-import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
-import org.apache.spark.sql.catalyst.util.ArrayData;
-import org.apache.spark.sql.catalyst.util.MapData;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.unsafe.Platform;
-import org.apache.spark.unsafe.array.ByteArrayMethods;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.sql.Timestamp;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Converts the OrcInterator, which returns ORC's VectorizedRowBatch to a
- * set of Spark's UnsafeRows.
- *
- * It minimizes allocations by reusing most of the objects in the implementation.
- */
-public class SparkOrcReader implements Iterator<InternalRow>, Closeable {
-  private final static int INITIAL_SIZE = 128 * 1024;
-  private final OrcIterator reader;
-  private final TypeDescription orcSchema;
-  private final UnsafeRow row;
-  private final BufferHolder holder;
-  private final UnsafeRowWriter writer;
-  private int nextRow = 0;
-  private VectorizedRowBatch current = null;
-  private Converter[] converter;
-
-  public SparkOrcReader(InputFile location,
-                        FileScanTask task,
-                        Schema readSchema) {
-    ColumnIdMap columnIds = new ColumnIdMap();
-    orcSchema = TypeConversion.toOrc(readSchema, columnIds);
-    reader = ORC.read(location)
-        .split(task.start(), task.length())
-        .schema(readSchema)
-        .build();
-    int numFields = readSchema.columns().size();
-    row = new UnsafeRow(numFields);
-    holder = new BufferHolder(row, INITIAL_SIZE);
-    writer = new UnsafeRowWriter(holder, numFields);
-    converter = new Converter[numFields];
-    for(int c=0; c < numFields; ++c) {
-      converter[c] = buildConverter(holder, orcSchema.getChildren().get(c));
-    }
-  }
-
-  @Override
-  public boolean hasNext() {
-    return (current != null && nextRow < current.size) || reader.hasNext();
-  }
-
-  @Override
-  public UnsafeRow next() {
-    if (current == null || nextRow >= current.size) {
-      current = reader.next();
-      nextRow = 0;
-    }
-    // Reset the holder to start the buffer over again.
-    // BufferHolder.reset does the wrong thing...
-    holder.cursor = Platform.BYTE_ARRAY_OFFSET;
-    writer.reset();
-    for(int c=0; c < current.cols.length; ++c) {
-      converter[c].convert(writer, c, current.cols[c], nextRow);
-    }
-    nextRow++;
-    return row;
-  }
-
-  @Override
-  public void close() throws IOException {
-    reader.close();
-  }
-
-  private static void printRow(SpecializedGetters row, TypeDescription schema) {
-    List<TypeDescription> children = schema.getChildren();
-    System.out.print("{");
-    for(int c = 0; c < children.size(); ++c) {
-      System.out.print("\"" + schema.getFieldNames().get(c) + "\": ");
-      printRow(row, c, children.get(c));
-    }
-    System.out.print("}");
-  }
-
-  private static void printRow(SpecializedGetters row, int ord, TypeDescription schema) {
-    switch (schema.getCategory()) {
-      case BOOLEAN:
-        System.out.print(row.getBoolean(ord));
-        break;
-      case BYTE:
-        System.out.print(row.getByte(ord));
-        break;
-      case SHORT:
-        System.out.print(row.getShort(ord));
-        break;
-      case INT:
-        System.out.print(row.getInt(ord));
-        break;
-      case LONG:
-        System.out.print(row.getLong(ord));
-        break;
-      case FLOAT:
-        System.out.print(row.getFloat(ord));
-        break;
-      case DOUBLE:
-        System.out.print(row.getDouble(ord));
-        break;
-      case CHAR:
-      case VARCHAR:
-      case STRING:
-        System.out.print("\"" + row.getUTF8String(ord) + "\"");
-        break;
-      case BINARY: {
-        byte[] bin = row.getBinary(ord);
-        if (bin == null) {
-          System.out.print("null");
-        } else {
-          System.out.print("[");
-          for (int i = 0; i < bin.length; ++i) {
-            if (i != 0) {
-              System.out.print(", ");
-            }
-            int v = bin[i] & 0xff;
-            if (v < 16) {
-              System.out.print("0" + Integer.toHexString(v));
-            } else {
-              System.out.print(Integer.toHexString(v));
-            }
-          }
-          System.out.print("]");
-        }
-        break;
-      }
-      case DECIMAL:
-        System.out.print(row.getDecimal(ord, schema.getPrecision(), schema.getScale()));
-        break;
-      case DATE:
-        System.out.print("\"" + new DateWritable(row.getInt(ord)) + "\"");
-        break;
-      case TIMESTAMP:
-        System.out.print("\"" + new Timestamp(row.getLong(ord)) + "\"");
-        break;
-      case STRUCT:
-        printRow(row.getStruct(ord, schema.getChildren().size()), schema);
-        break;
-      case LIST: {
-        TypeDescription child = schema.getChildren().get(0);
-        System.out.print("[");
-        ArrayData list = row.getArray(ord);
-        for(int e=0; e < list.numElements(); ++e) {
-          if (e != 0) {
-            System.out.print(", ");
-          }
-          printRow(list, e, child);
-        }
-        System.out.print("]");
-        break;
-      }
-      case MAP: {
-        TypeDescription keyType = schema.getChildren().get(0);
-        TypeDescription valueType = schema.getChildren().get(1);
-        MapData map = row.getMap(ord);
-        ArrayData keys = map.keyArray();
-        ArrayData values = map.valueArray();
-        System.out.print("[");
-        for(int e=0; e < map.numElements(); ++e) {
-          if (e != 0) {
-            System.out.print(", ");
-          }
-          printRow(keys, e, keyType);
-          System.out.print(": ");
-          printRow(values, e, valueType);
-        }
-        System.out.print("]");
-        break;
-      }
-      default:
-        throw new IllegalArgumentException("Unhandled type " + schema);
-    }
-  }
-  static int getArrayElementSize(TypeDescription type) {
-    switch (type.getCategory()) {
-      case BOOLEAN:
-      case BYTE:
-        return 1;
-      case SHORT:
-        return 2;
-      case INT:
-      case FLOAT:
-        return 4;
-      default:
-        return 8;
-    }
-  }
-
-  /**
-   * The common interface for converting from a ORC ColumnVector to a Spark
-   * UnsafeRow. UnsafeRows need two different interfaces for writers and thus
-   * we have two methods the first is for structs (UnsafeRowWriter) and the
-   * second is for lists and maps (UnsafeArrayWriter). If Spark adds a common
-   * interface similar to SpecializedGetters we could that and a single set of
-   * methods.
-   */
-  interface Converter {
-    void convert(UnsafeRowWriter writer, int column, ColumnVector vector, int row);
-    void convert(UnsafeArrayWriter writer, int element, ColumnVector vector,
-                 int row);
-  }
-
-  private static class BooleanConverter implements Converter {
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        writer.write(column, ((LongColumnVector) vector).vector[row] != 0);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        writer.write(element, ((LongColumnVector) vector).vector[row] != 0);
-      }
-    }
-  }
-
-  private static class ByteConverter implements Converter {
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector,
-                        int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        writer.write(column, (byte) ((LongColumnVector) vector).vector[row]);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        writer.write(element, (byte) ((LongColumnVector) vector).vector[row]);
-      }
-    }
-  }
-
-  private static class ShortConverter implements Converter {
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector,
-                        int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        writer.write(column, (short) ((LongColumnVector) vector).vector[row]);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        writer.write(element, (short) ((LongColumnVector) vector).vector[row]);
-      }
-    }
-  }
-
-  private static class IntConverter implements Converter {
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector,
-                        int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        writer.write(column, (int) ((LongColumnVector) vector).vector[row]);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        writer.write(element, (int) ((LongColumnVector) vector).vector[row]);
-      }
-    }
-  }
-
-  private static class LongConverter implements Converter {
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector,
-                        int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        writer.write(column, ((LongColumnVector) vector).vector[row]);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        writer.write(element, ((LongColumnVector) vector).vector[row]);
-      }
-    }
-  }
-
-  private static class FloatConverter implements Converter {
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector,
-                        int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        writer.write(column, (float) ((DoubleColumnVector) vector).vector[row]);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        writer.write(element, (float) ((DoubleColumnVector) vector).vector[row]);
-      }
-    }
-  }
-
-  private static class DoubleConverter implements Converter {
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector,
-                        int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        writer.write(column, ((DoubleColumnVector) vector).vector[row]);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        writer.write(element, ((DoubleColumnVector) vector).vector[row]);
-      }
-    }
-  }
-
-  private static class TimestampConverter implements Converter {
-
-    private long convert(TimestampColumnVector vector, int row) {
-      // compute microseconds past 1970.
-      long micros = (vector.time[row]/1000) * 1_000_000 + vector.nanos[row] / 1000;
-      return micros;
-    }
-
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector,
-                        int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        writer.write(column, convert((TimestampColumnVector) vector, row));
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        writer.write(element, convert((TimestampColumnVector) vector, row));
-      }
-    }
-  }
-
-  /**
-   * UnsafeArrayWriter doesn't have a binary form that lets the user pass an
-   * offset and length, so I've added one here. It is the minor tweak of the
-   * UnsafeArrayWriter.write(int, byte[]) method.
-   * @param holder the BufferHolder where the bytes are being written
-   * @param writer the UnsafeArrayWriter
-   * @param ordinal the element that we are writing into
-   * @param input the input bytes
-   * @param offset the first byte from input
-   * @param length the number of bytes to write
-   */
-  static void write(BufferHolder holder, UnsafeArrayWriter writer, int ordinal,
-                    byte[] input, int offset, int length) {
-    final int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(length);
-
-    // grow the global buffer before writing data.
-    holder.grow(roundedSize);
-
-    if ((length & 0x07) > 0) {
-      Platform.putLong(holder.buffer, holder.cursor + ((length >> 3) << 3), 0L);
-    }
-
-    // Write the bytes to the variable length portion.
-    Platform.copyMemory(input, Platform.BYTE_ARRAY_OFFSET + offset,
-        holder.buffer, holder.cursor, length);
-
-    writer.setOffsetAndSize(ordinal, holder.cursor, length);
-
-    // move the cursor forward.
-    holder.cursor += roundedSize;
-  }
-
-  private static class BinaryConverter implements Converter {
-    private final BufferHolder holder;
-
-    BinaryConverter(BufferHolder holder) {
-      this.holder = holder;
-    }
-
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector,
-                        int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        BytesColumnVector v = (BytesColumnVector) vector;
-        writer.write(column, v.vector[row], v.start[row], v.length[row]);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        BytesColumnVector v = (BytesColumnVector) vector;
-        write(holder, writer, element, v.vector[row], v.start[row],
-            v.length[row]);
-      }
-    }
-  }
-
-  /**
-   * This hack is to get the unscaled value (for precision <= 18) quickly.
-   * This can be replaced when we upgrade to storage-api 2.5.0.
-   */
-  static class DecimalHack extends FastHiveDecimal {
-    long unscaledLong(FastHiveDecimal value) {
-      fastSet(value);
-      return fastSignum * fast1 * 10_000_000_000_000_000L + fast0;
-    }
-  }
-
-  private static class Decimal18Converter implements Converter {
-    final DecimalHack hack = new DecimalHack();
-    final int precision;
-    final int scale;
-
-    Decimal18Converter(int precision, int scale) {
-      this.precision = precision;
-      this.scale = scale;
-    }
-
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector,
-                        int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        HiveDecimalWritable v = ((DecimalColumnVector) vector).vector[row];
-        writer.write(column,
-            new Decimal().set(hack.unscaledLong(v), precision, v.scale()),
-            precision, scale);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        HiveDecimalWritable v = ((DecimalColumnVector) vector).vector[row];
-        writer.write(element,
-            new Decimal().set(hack.unscaledLong(v), precision, v.scale()),
-            precision, scale);
-      }
-    }
-  }
-
-  private static class Decimal38Converter implements Converter {
-    final int precision;
-    final int scale;
-
-    Decimal38Converter(int precision, int scale) {
-      this.precision = precision;
-      this.scale = scale;
-    }
-
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector,
-                        int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        BigDecimal v = ((DecimalColumnVector) vector).vector[row]
-            .getHiveDecimal().bigDecimalValue();
-        writer.write(column,
-            new Decimal().set(new scala.math.BigDecimal(v), precision, scale),
-            precision, scale);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        BigDecimal v = ((DecimalColumnVector) vector).vector[row]
-            .getHiveDecimal().bigDecimalValue();
-        writer.write(element,
-            new Decimal().set(new scala.math.BigDecimal(v), precision, scale),
-            precision, scale);
-      }
-    }
-  }
-
-  private static class StructConverter implements Converter {
-    private final BufferHolder holder;
-    private final Converter[] children;
-    private final UnsafeRowWriter childWriter;
-
-    StructConverter(BufferHolder holder, TypeDescription schema) {
-      this.holder = holder;
-      children = new Converter[schema.getChildren().size()];
-      for(int c=0; c < children.length; ++c) {
-        children[c] = buildConverter(holder, schema.getChildren().get(c));
-      }
-      childWriter = new UnsafeRowWriter(holder, children.length);
-    }
-
-    int writeStruct(StructColumnVector vector, int row) {
-      int start = holder.cursor;
-      childWriter.reset();
-      for(int c=0; c < children.length; ++c) {
-        children[c].convert(childWriter, c, vector.fields[c], row);
-      }
-      return start;
-    }
-
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        int start = writeStruct((StructColumnVector) vector, row);
-        writer.setOffsetAndSize(column, start, holder.cursor - start);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        int start = writeStruct((StructColumnVector) vector, row);
-        writer.setOffsetAndSize(element, start, holder.cursor - start);
-      }
-    }
-  }
-
-  private static class ListConverter implements Converter {
-    private final BufferHolder holder;
-    private final Converter children;
-    private final UnsafeArrayWriter childWriter;
-    private final int elementSize;
-
-    ListConverter(BufferHolder holder, TypeDescription schema) {
-      this.holder = holder;
-      TypeDescription child = schema.getChildren().get(0);
-      children = buildConverter(holder, child);
-      childWriter = new UnsafeArrayWriter();
-      elementSize = getArrayElementSize(child);
-    }
-
-    int writeList(ListColumnVector v, int row) {
-      int offset = (int) v.offsets[row];
-      int length = (int) v.lengths[row];
-      int start = holder.cursor;
-      childWriter.initialize(holder, length, elementSize);
-      for(int c=0; c < length; ++c) {
-        children.convert(childWriter, c, v.child, offset + c);
-      }
-      return start;
-     }
-
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector,
-                        int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        int start = writeList((ListColumnVector) vector, row);
-        writer.setOffsetAndSize(column, start, holder.cursor - start);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        int start = writeList((ListColumnVector) vector, row);
-        writer.setOffsetAndSize(element, start, holder.cursor - start);
-      }
-    }
-  }
-
-  private static class MapConverter implements Converter {
-    private final BufferHolder holder;
-    private final Converter keyConvert;
-    private final Converter valueConvert;
-    private final UnsafeArrayWriter childWriter;
-    private final int keySize;
-    private final int valueSize;
-
-    MapConverter(BufferHolder holder, TypeDescription schema) {
-      this.holder = holder;
-      TypeDescription keyType = schema.getChildren().get(0);
-      TypeDescription valueType = schema.getChildren().get(1);
-      keyConvert = buildConverter(holder, keyType);
-      keySize = getArrayElementSize(keyType);
-      valueConvert = buildConverter(holder, valueType);
-      valueSize = getArrayElementSize(valueType);
-      childWriter = new UnsafeArrayWriter();
-    }
-
-    int writeMap(MapColumnVector v, int row) {
-      int offset = (int) v.offsets[row];
-      int length = (int) v.lengths[row];
-      int start = holder.cursor;
-      // save room for the key size
-      final int KEY_SIZE_BYTES = 8;
-      holder.grow(KEY_SIZE_BYTES);
-      holder.cursor += KEY_SIZE_BYTES;
-      // serialize the keys
-      childWriter.initialize(holder, length, keySize);
-      for(int c=0; c < length; ++c) {
-        keyConvert.convert(childWriter, c, v.keys, offset + c);
-      }
-      // store the serialized size of the keys
-      Platform.putLong(holder.buffer, start, holder.cursor - start - KEY_SIZE_BYTES);
-      // serialize the values
-      childWriter.initialize(holder, length, valueSize);
-      for(int c=0; c < length; ++c) {
-        valueConvert.convert(childWriter, c, v.values, offset + c);
-      }
-      return start;
-    }
-
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector,
-                        int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        int start = writeMap((MapColumnVector) vector, row);
-        writer.setOffsetAndSize(column, start, holder.cursor - start);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        int start = writeMap((MapColumnVector) vector, row);
-        writer.setOffsetAndSize(element, start, holder.cursor - start);
-      }
-    }
-  }
-
-  static Converter buildConverter(BufferHolder holder, TypeDescription schema) {
-    switch (schema.getCategory()) {
-      case BOOLEAN:
-        return new BooleanConverter();
-      case BYTE:
-        return new ByteConverter();
-      case SHORT:
-        return new ShortConverter();
-      case DATE:
-      case INT:
-        return new IntConverter();
-      case LONG:
-        return new LongConverter();
-      case FLOAT:
-        return new FloatConverter();
-      case DOUBLE:
-        return new DoubleConverter();
-      case TIMESTAMP:
-        return new TimestampConverter();
-      case DECIMAL:
-        if (schema.getPrecision() <= Decimal.MAX_LONG_DIGITS()) {
-          return new Decimal18Converter(schema.getPrecision(), schema.getScale());
-        } else {
-          return new Decimal38Converter(schema.getPrecision(), schema.getScale());
-        }
-      case BINARY:
-      case STRING:
-      case CHAR:
-      case VARCHAR:
-        return new BinaryConverter(holder);
-      case STRUCT:
-        return new StructConverter(holder, schema);
-      case LIST:
-        return new ListConverter(holder, schema);
-      case MAP:
-        return new MapConverter(holder, schema);
-      default:
-        throw new IllegalArgumentException("Unhandled type " + schema);
-    }
-  }
-}
diff --git a/spark/src/main/java/com/netflix/iceberg/spark/data/SparkOrcWriter.java b/spark/src/main/java/com/netflix/iceberg/spark/data/SparkOrcWriter.java
deleted file mode 100644 (file)
index 175be10..0000000
+++ /dev/null
@@ -1,434 +0,0 @@
-/*
- * Copyright 2018 Hortonworks
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.netflix.iceberg.spark.data;
-
-import com.netflix.iceberg.Metrics;
-import com.netflix.iceberg.io.FileAppender;
-import com.netflix.iceberg.orc.OrcFileAppender;
-import org.apache.orc.TypeDescription;
-import org.apache.orc.storage.common.type.HiveDecimal;
-import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
-import org.apache.orc.storage.ql.exec.vector.ColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
-import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
-import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
-import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
-import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
-import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
-import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
-import org.apache.spark.sql.catalyst.util.ArrayData;
-import org.apache.spark.sql.catalyst.util.DateTimeUtils;
-import org.apache.spark.sql.catalyst.util.MapData;
-import org.apache.spark.unsafe.types.UTF8String;
-
-import java.io.IOException;
-import java.sql.Timestamp;
-import java.util.List;
-
-/**
- * This class acts as an adaptor from an OrcFileAppender to a
- * FileAppender&lt;InternalRow&gt;.
- */
-public class SparkOrcWriter implements FileAppender<InternalRow> {
-  private final static int BATCH_SIZE = 1024;
-  private final VectorizedRowBatch batch;
-  private final OrcFileAppender writer;
-  private final Converter[] converters;
-
-  public SparkOrcWriter(OrcFileAppender writer) {
-    TypeDescription schema = writer.getSchema();
-    batch = schema.createRowBatch(BATCH_SIZE);
-    this.writer = writer;
-    converters = buildConverters(schema);
-  }
-
-  /**
-   * The interface for the conversion from Spark's SpecializedGetters to
-   * ORC's ColumnVectors.
-   */
-  interface Converter {
-    /**
-     * Take a value from the Spark data value and add it to the ORC output.
-     * @param rowId the row in the ColumnVector
-     * @param column either the column number or element number
-     * @param data either an InternalRow or ArrayData
-     * @param output the ColumnVector to put the value into
-     */
-    void addValue(int rowId, int column, SpecializedGetters data,
-                  ColumnVector output);
-  }
-
-  static class BooleanConverter implements Converter {
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getBoolean(column) ? 1 : 0;
-      }
-    }
-  }
-
-  static class ByteConverter implements Converter {
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getByte(column);
-      }
-    }
-  }
-
-  static class ShortConverter implements Converter {
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getShort(column);
-      }
-    }
-  }
-
-  static class IntConverter implements Converter {
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getInt(column);
-      }
-    }
-  }
-
-  static class LongConverter implements Converter {
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getLong(column);
-      }
-    }
-  }
-
-  static class FloatConverter implements Converter {
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DoubleColumnVector) output).vector[rowId] = data.getFloat(column);
-      }
-    }
-  }
-
-  static class DoubleConverter implements Converter {
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DoubleColumnVector) output).vector[rowId] = data.getDouble(column);
-      }
-    }
-  }
-
-  static class StringConverter implements Converter {
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        byte[] value = data.getUTF8String(column).getBytes();
-        ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
-      }
-    }
-  }
-
-  static class BytesConverter implements Converter {
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        // getBinary always makes a copy, so we don't need to worry about it
-        // being changed behind our back.
-        byte[] value = data.getBinary(column);
-        ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
-      }
-    }
-  }
-
-  static class TimestampConverter implements Converter {
-
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        TimestampColumnVector cv = (TimestampColumnVector) output;
-        long micros = data.getLong(column);
-        cv.time[rowId] = (micros / 1_000_000) * 1000;
-        int nanos = (int) (micros % 1_000_000) * 1000;
-        if (nanos < 0) {
-          nanos += 1_000_000_000;
-         cv.time[rowId] -= 1000;
-        }
-        cv.nanos[rowId] = nanos;
-      }
-    }
-  }
-
-  static class Decimal18Converter implements Converter {
-    private final int precision;
-    private final int scale;
-
-    Decimal18Converter(TypeDescription schema) {
-      precision = schema.getPrecision();
-      scale = schema.getScale();
-    }
-
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DecimalColumnVector) output).vector[rowId].setFromLongAndScale(
-            data.getDecimal(column, precision, scale).toUnscaledLong(), scale);
-      }
-    }
-  }
-
-  static class Decimal38Converter implements Converter {
-    private final int precision;
-    private final int scale;
-
-    Decimal38Converter(TypeDescription schema) {
-      precision = schema.getPrecision();
-      scale = schema.getScale();
-    }
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DecimalColumnVector) output).vector[rowId].set(
-            HiveDecimal.create(data.getDecimal(column, precision, scale)
-                .toJavaBigDecimal()));
-      }
-    }
-  }
-
-  static class StructConverter implements Converter {
-    private final Converter[] children;
-
-    StructConverter(TypeDescription schema) {
-      children = new Converter[schema.getChildren().size()];
-      for(int c=0; c < children.length; ++c) {
-        children[c] = buildConverter(schema.getChildren().get(c));
-      }
-    }
-
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        InternalRow value = data.getStruct(column, children.length);
-        StructColumnVector cv = (StructColumnVector) output;
-        for(int c=0; c < children.length; ++c) {
-          children[c].addValue(rowId, c, value, cv.fields[c]);
-        }
-      }
-    }
-  }
-
-  static class ListConverter implements Converter {
-    private final Converter children;
-
-    ListConverter(TypeDescription schema) {
-      children = buildConverter(schema.getChildren().get(0));
-    }
-
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ArrayData value = data.getArray(column);
-        ListColumnVector cv = (ListColumnVector) output;
-        // record the length and start of the list elements
-        cv.lengths[rowId] = value.numElements();
-        cv.offsets[rowId] = cv.childCount;
-        cv.childCount += cv.lengths[rowId];
-        // make sure the child is big enough
-        cv.child.ensureSize(cv.childCount, true);
-        // Add each element
-        for(int e=0; e < cv.lengths[rowId]; ++e) {
-          children.addValue((int) (e + cv.offsets[rowId]), e, value, cv.child);
-        }
-      }
-    }
-  }
-
-  static class MapConverter implements Converter {
-    private final Converter keyConverter;
-    private final Converter valueConverter;
-
-    MapConverter(TypeDescription schema) {
-      keyConverter = buildConverter(schema.getChildren().get(0));
-      valueConverter = buildConverter(schema.getChildren().get(1));
-    }
-
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        MapData map = data.getMap(column);
-        ArrayData key = map.keyArray();
-        ArrayData value = map.valueArray();
-        MapColumnVector cv = (MapColumnVector) output;
-        // record the length and start of the list elements
-        cv.lengths[rowId] = value.numElements();
-        cv.offsets[rowId] = cv.childCount;
-        cv.childCount += cv.lengths[rowId];
-        // make sure the child is big enough
-        cv.keys.ensureSize(cv.childCount, true);
-        cv.values.ensureSize(cv.childCount, true);
-        // Add each element
-        for(int e=0; e < cv.lengths[rowId]; ++e) {
-          int pos = (int)(e + cv.offsets[rowId]);
-          keyConverter.addValue(pos, e, key, cv.keys);
-          valueConverter.addValue(pos, e, value, cv.values);
-        }
-      }
-    }
-  }
-
-  private static Converter buildConverter(TypeDescription schema) {
-    switch (schema.getCategory()) {
-      case BOOLEAN:
-        return new BooleanConverter();
-      case BYTE:
-        return new ByteConverter();
-      case SHORT:
-        return new ShortConverter();
-      case DATE:
-      case INT:
-        return new IntConverter();
-      case LONG:
-        return new LongConverter();
-      case FLOAT:
-        return new FloatConverter();
-      case DOUBLE:
-        return new DoubleConverter();
-      case BINARY:
-        return new BytesConverter();
-      case STRING:
-      case CHAR:
-      case VARCHAR:
-        return new StringConverter();
-      case DECIMAL:
-        return schema.getPrecision() <= 18
-            ? new Decimal18Converter(schema)
-            : new Decimal38Converter(schema);
-      case TIMESTAMP:
-        return new TimestampConverter();
-      case STRUCT:
-        return new StructConverter(schema);
-      case LIST:
-        return new ListConverter(schema);
-      case MAP:
-        return new MapConverter(schema);
-    }
-    throw new IllegalArgumentException("Unhandled type " + schema);
-  }
-
-  private static Converter[] buildConverters(TypeDescription schema) {
-    if (schema.getCategory() != TypeDescription.Category.STRUCT) {
-      throw new IllegalArgumentException("Top level must be a struct " + schema);
-    }
-    List<TypeDescription> children = schema.getChildren();
-    Converter[] result = new Converter[children.size()];
-    for(int c=0; c < children.size(); ++c) {
-      result[c] = buildConverter(children.get(c));
-    }
-    return result;
-  }
-
-  @Override
-  public void add(InternalRow datum) {
-    int row = batch.size++;
-    for(int c=0; c < converters.length; ++c) {
-      converters[c].addValue(row, c, datum, batch.cols[c]);
-    }
-    if (batch.size == BATCH_SIZE) {
-      writer.add(batch);
-      batch.reset();
-    }
-  }
-
-  @Override
-  public Metrics metrics() {
-    return writer.metrics();
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (batch.size > 0) {
-      writer.add(batch);
-      batch.reset();
-    }
-    writer.close();
-  }
-}
index 78fbb80..4a008ee 100644 (file)
@@ -20,7 +20,6 @@
 package com.netflix.iceberg.spark.source;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import com.netflix.iceberg.CombinedScanTask;
 import com.netflix.iceberg.DataFile;
 import com.netflix.iceberg.FileScanTask;
@@ -39,10 +38,9 @@ import com.netflix.iceberg.hadoop.HadoopInputFile;
 import com.netflix.iceberg.io.CloseableIterable;
 import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.parquet.Parquet;
-import com.netflix.iceberg.spark.SparkExpressions;
+import com.netflix.iceberg.spark.SparkFilters;
 import com.netflix.iceberg.spark.SparkSchemaUtil;
 import com.netflix.iceberg.spark.data.SparkAvroReader;
-import com.netflix.iceberg.spark.data.SparkOrcReader;
 import com.netflix.iceberg.spark.data.SparkParquetReaders;
 import com.netflix.iceberg.types.TypeUtil;
 import com.netflix.iceberg.types.Types;
@@ -53,15 +51,14 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.catalyst.expressions.JoinedRow;
 import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
-import org.apache.spark.sql.sources.v2.reader.DataReader;
+import org.apache.spark.sql.sources.Filter;
 import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
 import org.apache.spark.sql.sources.v2.reader.Statistics;
-import org.apache.spark.sql.sources.v2.reader.SupportsPushDownCatalystFilters;
+import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
 import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
 import org.apache.spark.sql.sources.v2.reader.SupportsReportStatistics;
-import org.apache.spark.sql.sources.v2.reader.SupportsScanUnsafeRow;
 import org.apache.spark.sql.types.BinaryType;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
@@ -86,17 +83,16 @@ import static com.netflix.iceberg.spark.SparkSchemaUtil.prune;
 import static scala.collection.JavaConverters.asScalaBufferConverter;
 import static scala.collection.JavaConverters.seqAsJavaListConverter;
 
-class Reader implements DataSourceReader, SupportsScanUnsafeRow, SupportsPushDownCatalystFilters,
-    SupportsPushDownRequiredColumns, SupportsReportStatistics {
+class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushDownRequiredColumns,
+    SupportsReportStatistics {
 
-  private static final org.apache.spark.sql.catalyst.expressions.Expression[] NO_EXPRS =
-      new org.apache.spark.sql.catalyst.expressions.Expression[0];
+  private static final Filter[] NO_FILTERS = new Filter[0];
 
   private final Table table;
   private final SerializableConfiguration conf;
   private StructType requestedSchema = null;
   private List<Expression> filterExpressions = null;
-  private org.apache.spark.sql.catalyst.expressions.Expression[] pushedExprs = NO_EXPRS;
+  private Filter[] pushedFilters = NO_FILTERS;
 
   // lazy variables
   private Schema schema = null;
@@ -133,11 +129,11 @@ class Reader implements DataSourceReader, SupportsScanUnsafeRow, SupportsPushDow
   }
 
   @Override
-  public List<DataReaderFactory<UnsafeRow>> createUnsafeRowReaderFactories() {
+  public List<InputPartition<InternalRow>> planInputPartitions() {
     String tableSchemaString = SchemaParser.toJson(table.schema());
     String expectedSchemaString = SchemaParser.toJson(lazySchema());
 
-    List<DataReaderFactory<UnsafeRow>> readTasks = Lists.newArrayList();
+    List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
     for (CombinedScanTask task : tasks()) {
       readTasks.add(new ReadTask(task, tableSchemaString, expectedSchemaString, conf));
     }
@@ -146,16 +142,14 @@ class Reader implements DataSourceReader, SupportsScanUnsafeRow, SupportsPushDow
   }
 
   @Override
-  public org.apache.spark.sql.catalyst.expressions.Expression[] pushCatalystFilters(
-      org.apache.spark.sql.catalyst.expressions.Expression[] filters) {
+  public Filter[] pushFilters(Filter[] filters) {
     this.tasks = null; // invalidate cached tasks, if present
 
     List<Expression> expressions = Lists.newArrayListWithExpectedSize(filters.length);
-    List<org.apache.spark.sql.catalyst.expressions.Expression> pushed =
-        Lists.newArrayListWithExpectedSize(filters.length);
+    List<Filter> pushed = Lists.newArrayListWithExpectedSize(filters.length);
 
-    for (org.apache.spark.sql.catalyst.expressions.Expression filter : filters) {
-      Expression expr = SparkExpressions.convert(filter);
+    for (Filter filter : filters) {
+      Expression expr = SparkFilters.convert(filter);
       if (expr != null) {
         expressions.add(expr);
         pushed.add(filter);
@@ -163,7 +157,7 @@ class Reader implements DataSourceReader, SupportsScanUnsafeRow, SupportsPushDow
     }
 
     this.filterExpressions = expressions;
-    this.pushedExprs = pushed.toArray(new org.apache.spark.sql.catalyst.expressions.Expression[0]);
+    this.pushedFilters = pushed.toArray(new Filter[0]);
 
     // invalidate the schema that will be projected
     this.schema = null;
@@ -175,8 +169,8 @@ class Reader implements DataSourceReader, SupportsScanUnsafeRow, SupportsPushDow
   }
 
   @Override
-  public org.apache.spark.sql.catalyst.expressions.Expression[] pushedCatalystFilters() {
-    return pushedExprs;
+  public Filter[] pushedFilters() {
+    return pushedFilters;
   }
 
   @Override
@@ -189,7 +183,7 @@ class Reader implements DataSourceReader, SupportsScanUnsafeRow, SupportsPushDow
   }
 
   @Override
-  public Statistics getStatistics() {
+  public Statistics estimateStatistics() {
     long sizeInBytes = 0L;
     long numRows = 0L;
 
@@ -230,7 +224,7 @@ class Reader implements DataSourceReader, SupportsScanUnsafeRow, SupportsPushDow
         table, lazySchema().asStruct(), filterExpressions);
   }
 
-  private static class ReadTask implements DataReaderFactory<UnsafeRow>, Serializable {
+  private static class ReadTask implements InputPartition<InternalRow>, Serializable {
     private final CombinedScanTask task;
     private final String tableSchemaString;
     private final String expectedSchemaString;
@@ -248,7 +242,7 @@ class Reader implements DataSourceReader, SupportsScanUnsafeRow, SupportsPushDow
     }
 
     @Override
-    public DataReader<UnsafeRow> createDataReader() {
+    public InputPartitionReader<InternalRow> createPartitionReader() {
       return new TaskDataReader(task, lazyTableSchema(), lazyExpectedSchema(), conf.value());
     }
 
@@ -267,7 +261,7 @@ class Reader implements DataSourceReader, SupportsScanUnsafeRow, SupportsPushDow
     }
   }
 
-  private static class TaskDataReader implements DataReader<UnsafeRow> {
+  private static class TaskDataReader implements InputPartitionReader<InternalRow> {
     // for some reason, the apply method can't be called from Java without reflection
     private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply")
         .impl(UnsafeProjection.class, InternalRow.class)
@@ -278,9 +272,9 @@ class Reader implements DataSourceReader, SupportsScanUnsafeRow, SupportsPushDow
     private final Schema expectedSchema;
     private final Configuration conf;
 
-    private Iterator<UnsafeRow> currentIterator = null;
+    private Iterator<InternalRow> currentIterator = null;
     private Closeable currentCloseable = null;
-    private UnsafeRow current = null;
+    private InternalRow current = null;
 
     public TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, Configuration conf) {
       this.tasks = task.files().iterator();
@@ -309,7 +303,7 @@ class Reader implements DataSourceReader, SupportsScanUnsafeRow, SupportsPushDow
     }
 
     @Override
-    public UnsafeRow get() {
+    public InternalRow get() {
       return current;
     }
 
@@ -324,13 +318,13 @@ class Reader implements DataSourceReader, SupportsScanUnsafeRow, SupportsPushDow
       }
     }
 
-    private Iterator<UnsafeRow> open(FileScanTask task) {
+    private Iterator<InternalRow> open(FileScanTask task) {
       DataFile file = task.file();
 
       // schema or rows returned by readers
       Schema finalSchema = expectedSchema;
       PartitionSpec spec = task.spec();
-      Set<Integer> idColumns = identitySourceIds(spec);
+      Set<Integer> idColumns = spec.identitySourceIds();
 
       // schema needed for the projection and filtering
       Schema requiredSchema = prune(tableSchema, convert(finalSchema), task.residual());
@@ -365,6 +359,7 @@ class Reader implements DataSourceReader, SupportsScanUnsafeRow, SupportsPushDow
         iter = open(task, finalSchema, conf);
       }
 
+      // TODO: remove the projection by reporting the iterator's schema back to Spark
       return transform(iter,
           APPLY_PROJECTION.bind(projection(finalSchema, iterSchema))::invoke);
     }
@@ -391,29 +386,11 @@ class Reader implements DataSourceReader, SupportsScanUnsafeRow, SupportsPushDow
           asScalaBufferConverter(attrs).asScala().toSeq());
     }
 
-    private static Set<Integer> identitySourceIds(PartitionSpec spec) {
-      Set<Integer> sourceIds = Sets.newHashSet();
-      List<PartitionField> fields = spec.fields();
-      for (int i = 0; i < fields.size(); i += 1) {
-        PartitionField field = fields.get(i);
-        if ("identity".equals(field.transform().toString())) {
-          sourceIds.add(field.sourceId());
-        }
-      }
-
-      return sourceIds;
-    }
-
     private Iterator<InternalRow> open(FileScanTask task, Schema readSchema,
                                        Configuration conf) {
       InputFile location = HadoopInputFile.fromLocation(task.file().path(), conf);
       CloseableIterable<InternalRow> iter;
       switch (task.file().format()) {
-        case ORC:
-          SparkOrcReader reader = new SparkOrcReader(location, task, readSchema);
-          this.currentCloseable = reader;
-          return reader;
-
         case PARQUET:
           iter = newParquetIterable(location, task, readSchema);
           break;
index e729474..bed2cf6 100644 (file)
@@ -19,7 +19,6 @@
 
 package com.netflix.iceberg.spark.source;
 
-import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
@@ -40,10 +39,8 @@ import com.netflix.iceberg.hadoop.HadoopOutputFile;
 import com.netflix.iceberg.io.FileAppender;
 import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.io.OutputFile;
-import com.netflix.iceberg.orc.ORC;
 import com.netflix.iceberg.parquet.Parquet;
 import com.netflix.iceberg.spark.data.SparkAvroWriter;
-import com.netflix.iceberg.spark.data.SparkOrcWriter;
 import com.netflix.iceberg.transforms.Transform;
 import com.netflix.iceberg.transforms.Transforms;
 import com.netflix.iceberg.types.Types.StringType;
@@ -56,7 +53,6 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
 import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
 import org.apache.spark.sql.sources.v2.writer.DataWriter;
 import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
-import org.apache.spark.sql.sources.v2.writer.SupportsWriteInternalRow;
 import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
 import org.apache.spark.util.SerializableConfiguration;
 import org.slf4j.Logger;
@@ -86,7 +82,7 @@ import static com.netflix.iceberg.TableProperties.OBJECT_STORE_PATH;
 import static com.netflix.iceberg.spark.SparkSchemaUtil.convert;
 
 // TODO: parameterize DataSourceWriter with subclass of WriterCommitMessage
-class Writer implements DataSourceWriter, SupportsWriteInternalRow {
+class Writer implements DataSourceWriter {
   private static final Transform<String, Integer> HASH_FUNC = Transforms
       .bucket(StringType.get(), Integer.MAX_VALUE);
   private static final Logger LOG = LoggerFactory.getLogger(Writer.class);
@@ -102,7 +98,7 @@ class Writer implements DataSourceWriter, SupportsWriteInternalRow {
   }
 
   @Override
-  public DataWriterFactory<InternalRow> createInternalRowWriterFactory() {
+  public DataWriterFactory<InternalRow> createWriterFactory() {
     return new WriterFactory(table.spec(), format, dataLocation(), table.properties(), conf);
   }
 
@@ -218,9 +214,8 @@ class Writer implements DataSourceWriter, SupportsWriteInternalRow {
     }
 
     @Override
-    public DataWriter<InternalRow> createDataWriter(int partitionId, int attemptNumber) {
-      String filename = format.addExtension(String.format("%05d-%d-%s",
-          partitionId, attemptNumber, uuid));
+    public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
+      String filename = format.addExtension(String.format("%05d-%d-%s", partitionId, taskId, uuid));
       AppenderFactory<InternalRow> factory = new SparkAppenderFactory();
       if (spec.fields().isEmpty()) {
         return new UnpartitionedWriter(lazyDataPath(), filename, format, conf.value(), factory);
@@ -301,13 +296,6 @@ class Writer implements DataSourceWriter, SupportsWriteInternalRow {
                   .schema(schema)
                   .build();
 
-            case ORC: {
-              @SuppressWarnings("unchecked")
-              SparkOrcWriter writer = new SparkOrcWriter(ORC.write(file)
-                  .schema(schema)
-                  .build());
-              return writer;
-            }
             default:
               throw new UnsupportedOperationException("Cannot write unknown format: " + format);
           }
diff --git a/spark/src/test/java/com/netflix/iceberg/spark/data/CodegenExamples.java b/spark/src/test/java/com/netflix/iceberg/spark/data/CodegenExamples.java
deleted file mode 100644 (file)
index 5ddef24..0000000
+++ /dev/null
@@ -1,707 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.netflix.iceberg.spark.data;
-
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData;
-import org.apache.spark.sql.catalyst.expressions.UnsafeMapData;
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
-import org.apache.spark.sql.catalyst.util.ArrayData;
-import org.apache.spark.sql.catalyst.util.MapData;
-import org.apache.spark.unsafe.Platform;
-import org.apache.spark.unsafe.types.UTF8String;
-
-public class CodegenExamples {
-
-
-  class Example1 extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
-
-    private Object[] references;
-    private UnsafeRow result;
-    private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder holder;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter;
-
-    public Example1(Object[] references) {
-      this.references = references;
-      result = new UnsafeRow(2);
-      this.holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(result, 32);
-      this.rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 2);
-
-    }
-
-    public void initialize(int partitionIndex) {
-
-    }
-
-    public UnsafeRow apply(InternalRow i) {
-      holder.reset();
-
-      rowWriter.zeroOutNullBytes();
-
-
-      boolean isNull = i.isNullAt(0);
-      long value = isNull ? -1L : (i.getLong(0));
-      if (isNull) {
-        rowWriter.setNullAt(0);
-      } else {
-        rowWriter.write(0, value);
-      }
-
-
-      boolean isNull1 = i.isNullAt(1);
-      UTF8String value1 = isNull1 ? null : (i.getUTF8String(1));
-      if (isNull1) {
-        rowWriter.setNullAt(1);
-      } else {
-        rowWriter.write(1, value1);
-      }
-      result.setTotalSize(holder.totalSize());
-      return result;
-    }
-  }
-
-  class Example2 extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
-
-    private Object[] references;
-    private UnsafeRow result;
-    private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder holder;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter1;
-
-    public Example2(Object[] references) {
-      this.references = references;
-      result = new UnsafeRow(1);
-      this.holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(result, 32);
-      this.rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 1);
-      this.rowWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 1);
-
-    }
-
-    public void initialize(int partitionIndex) {
-
-    }
-
-    public UnsafeRow apply(InternalRow i) {
-      holder.reset();
-
-      rowWriter.zeroOutNullBytes();
-
-
-      boolean isNull = i.isNullAt(0);
-      InternalRow value = isNull ? null : (i.getStruct(0, 1));
-      if (isNull) {
-        rowWriter.setNullAt(0);
-      } else {
-        // Remember the current cursor so that we can calculate how many bytes are
-        // written later.
-        final int tmpCursor = holder.cursor;
-
-        if (value instanceof UnsafeRow) {
-
-          final int sizeInBytes = ((UnsafeRow) value).getSizeInBytes();
-          // grow the global buffer before writing data.
-          holder.grow(sizeInBytes);
-          ((UnsafeRow) value).writeToMemory(holder.buffer, holder.cursor);
-          holder.cursor += sizeInBytes;
-
-        } else {
-          rowWriter1.reset();
-
-
-          boolean isNull1 = value.isNullAt(0);
-          float value1 = isNull1 ? -1.0f : value.getFloat(0);
-
-          if (isNull1) {
-            rowWriter1.setNullAt(0);
-          } else {
-            rowWriter1.write(0, value1);
-          }
-        }
-
-        rowWriter.setOffsetAndSize(0, tmpCursor, holder.cursor - tmpCursor);
-      }
-      result.setTotalSize(holder.totalSize());
-      return result;
-    }
-  }
-
-  class Example3 extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
-
-    private Object[] references;
-    private UnsafeRow result;
-    private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder holder;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter1;
-
-    public Example3(Object[] references) {
-      this.references = references;
-      result = new UnsafeRow(1);
-      this.holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(result, 32);
-      this.rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 1);
-      this.rowWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 2);
-
-    }
-
-    public void initialize(int partitionIndex) {
-
-    }
-
-    public UnsafeRow apply(InternalRow i) {
-      holder.reset();
-
-      rowWriter.zeroOutNullBytes();
-
-
-      boolean isNull = i.isNullAt(0);
-      InternalRow value = isNull ? null : (i.getStruct(0, 2));
-      if (isNull) {
-        rowWriter.setNullAt(0);
-      } else {
-        // Remember the current cursor so that we can calculate how many bytes are
-        // written later.
-        final int tmpCursor = holder.cursor;
-
-        if (value instanceof UnsafeRow) {
-
-          final int sizeInBytes = ((UnsafeRow) value).getSizeInBytes();
-          // grow the global buffer before writing data.
-          holder.grow(sizeInBytes);
-          ((UnsafeRow) value).writeToMemory(holder.buffer, holder.cursor);
-          holder.cursor += sizeInBytes;
-
-        } else {
-          rowWriter1.reset();
-
-
-          boolean isNull1 = value.isNullAt(0);
-          float value1 = isNull1 ? -1.0f : value.getFloat(0);
-
-          if (isNull1) {
-            rowWriter1.setNullAt(0);
-          } else {
-            rowWriter1.write(0, value1);
-          }
-
-
-          boolean isNull2 = value.isNullAt(1);
-          float value2 = isNull2 ? -1.0f : value.getFloat(1);
-
-          if (isNull2) {
-            rowWriter1.setNullAt(1);
-          } else {
-            rowWriter1.write(1, value2);
-          }
-        }
-
-        rowWriter.setOffsetAndSize(0, tmpCursor, holder.cursor - tmpCursor);
-      }
-      result.setTotalSize(holder.totalSize());
-      return result;
-    }
-  }
-
-
-  class Example4 extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
-
-    private Object[] references;
-    private UnsafeRow result;
-    private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder holder;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter arrayWriter;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter arrayWriter1;
-
-    public Example4(Object[] references) {
-      this.references = references;
-      result = new UnsafeRow(1);
-      this.holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(result, 32);
-      this.rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 1);
-      this.arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
-      this.arrayWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
-
-    }
-
-    public void initialize(int partitionIndex) {
-
-    }
-
-    public UnsafeRow apply(InternalRow i) {
-      holder.reset();
-
-      rowWriter.zeroOutNullBytes();
-
-
-      boolean isNull = i.isNullAt(0);
-      MapData value = isNull ? null : (i.getMap(0));
-      if (isNull) {
-        rowWriter.setNullAt(0);
-      } else {
-        // Remember the current cursor so that we can calculate how many bytes are
-        // written later.
-        final int tmpCursor = holder.cursor;
-
-        if (value instanceof UnsafeMapData) {
-
-          final int sizeInBytes = ((UnsafeMapData) value).getSizeInBytes();
-          // grow the global buffer before writing data.
-          holder.grow(sizeInBytes);
-          ((UnsafeMapData) value).writeToMemory(holder.buffer, holder.cursor);
-          holder.cursor += sizeInBytes;
-
-        } else {
-          final ArrayData keys = value.keyArray();
-          final ArrayData values = value.valueArray();
-
-          // preserve 8 bytes to write the key array numBytes later.
-          holder.grow(8);
-          holder.cursor += 8;
-
-          // Remember the current cursor so that we can write numBytes of key array later.
-          final int tmpCursor1 = holder.cursor;
-
-
-          if (keys instanceof UnsafeArrayData) {
-
-            final int sizeInBytes1 = ((UnsafeArrayData) keys).getSizeInBytes();
-            // grow the global buffer before writing data.
-            holder.grow(sizeInBytes1);
-            ((UnsafeArrayData) keys).writeToMemory(holder.buffer, holder.cursor);
-            holder.cursor += sizeInBytes1;
-
-          } else {
-            final int numElements = keys.numElements();
-            arrayWriter.initialize(holder, numElements, 8);
-
-            for (int index = 0; index < numElements; index++) {
-              if (keys.isNullAt(index)) {
-                arrayWriter.setNull(index);
-              } else {
-                final UTF8String element = keys.getUTF8String(index);
-                arrayWriter.write(index, element);
-              }
-            }
-          }
-
-          // Write the numBytes of key array into the first 8 bytes.
-          Platform.putLong(holder.buffer, tmpCursor1 - 8, holder.cursor - tmpCursor1);
-
-
-          if (values instanceof UnsafeArrayData) {
-
-            final int sizeInBytes2 = ((UnsafeArrayData) values).getSizeInBytes();
-            // grow the global buffer before writing data.
-            holder.grow(sizeInBytes2);
-            ((UnsafeArrayData) values).writeToMemory(holder.buffer, holder.cursor);
-            holder.cursor += sizeInBytes2;
-
-          } else {
-            final int numElements1 = values.numElements();
-            arrayWriter1.initialize(holder, numElements1, 8);
-
-            for (int index1 = 0; index1 < numElements1; index1++) {
-              if (values.isNullAt(index1)) {
-                arrayWriter1.setNull(index1);
-              } else {
-                final UTF8String element1 = values.getUTF8String(index1);
-                arrayWriter1.write(index1, element1);
-              }
-            }
-          }
-
-        }
-
-        rowWriter.setOffsetAndSize(0, tmpCursor, holder.cursor - tmpCursor);
-      }
-      result.setTotalSize(holder.totalSize());
-      return result;
-    }
-  }
-
-
-  class Example5 extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
-
-    private Object[] references;
-    private UnsafeRow result;
-    private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder holder;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter arrayWriter;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter1;
-
-    public Example5(Object[] references) {
-      this.references = references;
-      result = new UnsafeRow(1);
-      this.holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(result, 32);
-      this.rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 1);
-      this.arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
-      this.rowWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 2);
-
-    }
-
-    public void initialize(int partitionIndex) {
-
-    }
-
-    public UnsafeRow apply(InternalRow i) {
-      holder.reset();
-
-      rowWriter.zeroOutNullBytes();
-
-
-      boolean isNull = i.isNullAt(0);
-      ArrayData value = isNull ? null : (i.getArray(0));
-      if (isNull) {
-        rowWriter.setNullAt(0);
-      } else {
-        // Remember the current cursor so that we can calculate how many bytes are
-        // written later.
-        final int tmpCursor = holder.cursor;
-
-        if (value instanceof UnsafeArrayData) {
-
-          final int sizeInBytes1 = ((UnsafeArrayData) value).getSizeInBytes();
-          // grow the global buffer before writing data.
-          holder.grow(sizeInBytes1);
-          ((UnsafeArrayData) value).writeToMemory(holder.buffer, holder.cursor);
-          holder.cursor += sizeInBytes1;
-
-        } else {
-          final int numElements = value.numElements();
-          arrayWriter.initialize(holder, numElements, 8);
-
-          for (int index = 0; index < numElements; index++) {
-            if (value.isNullAt(index)) {
-              arrayWriter.setNull(index);
-            } else {
-              final InternalRow element = value.getStruct(index, 2);
-
-              final int tmpCursor1 = holder.cursor;
-
-              if (element instanceof UnsafeRow) {
-
-                final int sizeInBytes = ((UnsafeRow) element).getSizeInBytes();
-                // grow the global buffer before writing data.
-                holder.grow(sizeInBytes);
-                ((UnsafeRow) element).writeToMemory(holder.buffer, holder.cursor);
-                holder.cursor += sizeInBytes;
-
-              } else {
-                rowWriter1.reset();
-
-
-                boolean isNull1 = element.isNullAt(0);
-                int value1 = isNull1 ? -1 : element.getInt(0);
-
-                if (isNull1) {
-                  rowWriter1.setNullAt(0);
-                } else {
-                  rowWriter1.write(0, value1);
-                }
-
-
-                boolean isNull2 = element.isNullAt(1);
-                int value2 = isNull2 ? -1 : element.getInt(1);
-
-                if (isNull2) {
-                  rowWriter1.setNullAt(1);
-                } else {
-                  rowWriter1.write(1, value2);
-                }
-              }
-
-              arrayWriter.setOffsetAndSize(index, tmpCursor1, holder.cursor - tmpCursor1);
-
-            }
-          }
-        }
-
-        rowWriter.setOffsetAndSize(0, tmpCursor, holder.cursor - tmpCursor);
-      }
-      result.setTotalSize(holder.totalSize());
-      return result;
-    }
-  }
-
-  class Example6 extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
-
-    private Object[] references;
-    private UnsafeRow result;
-    private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder holder;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter arrayWriter;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter arrayWriter1;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter1;
-
-    public Example6(Object[] references) {
-      this.references = references;
-      result = new UnsafeRow(1);
-      this.holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(result, 32);
-      this.rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 1);
-      this.arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
-      this.arrayWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
-      this.rowWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 2);
-
-    }
-
-    public void initialize(int partitionIndex) {
-
-    }
-
-    public UnsafeRow apply(InternalRow i) {
-      holder.reset();
-
-      rowWriter.zeroOutNullBytes();
-
-
-      boolean isNull = i.isNullAt(0);
-      MapData value = isNull ? null : (i.getMap(0));
-      if (isNull) {
-        rowWriter.setNullAt(0);
-      } else {
-        // Remember the current cursor so that we can calculate how many bytes are
-        // written later.
-        final int tmpCursor = holder.cursor;
-
-        if (value instanceof UnsafeMapData) {
-
-          final int sizeInBytes = ((UnsafeMapData) value).getSizeInBytes();
-          // grow the global buffer before writing data.
-          holder.grow(sizeInBytes);
-          ((UnsafeMapData) value).writeToMemory(holder.buffer, holder.cursor);
-          holder.cursor += sizeInBytes;
-
-        } else {
-          final ArrayData keys = value.keyArray();
-          final ArrayData values = value.valueArray();
-
-          // preserve 8 bytes to write the key array numBytes later.
-          holder.grow(8);
-          holder.cursor += 8;
-
-          // Remember the current cursor so that we can write numBytes of key array later.
-          final int tmpCursor1 = holder.cursor;
-
-
-          if (keys instanceof UnsafeArrayData) {
-
-            final int sizeInBytes1 = ((UnsafeArrayData) keys).getSizeInBytes();
-            // grow the global buffer before writing data.
-            holder.grow(sizeInBytes1);
-            ((UnsafeArrayData) keys).writeToMemory(holder.buffer, holder.cursor);
-            holder.cursor += sizeInBytes1;
-
-          } else {
-            final int numElements = keys.numElements();
-            arrayWriter.initialize(holder, numElements, 8);
-
-            for (int index = 0; index < numElements; index++) {
-              if (keys.isNullAt(index)) {
-                arrayWriter.setNull(index);
-              } else {
-                final UTF8String element = keys.getUTF8String(index);
-                arrayWriter.write(index, element);
-              }
-            }
-          }
-
-          // Write the numBytes of key array into the first 8 bytes.
-          Platform.putLong(holder.buffer, tmpCursor1 - 8, holder.cursor - tmpCursor1);
-
-
-          if (values instanceof UnsafeArrayData) {
-
-            final int sizeInBytes3 = ((UnsafeArrayData) values).getSizeInBytes();
-            // grow the global buffer before writing data.
-            holder.grow(sizeInBytes3);
-            ((UnsafeArrayData) values).writeToMemory(holder.buffer, holder.cursor);
-            holder.cursor += sizeInBytes3;
-
-          } else {
-            final int numElements1 = values.numElements();
-            arrayWriter1.initialize(holder, numElements1, 8);
-
-            for (int index1 = 0; index1 < numElements1; index1++) {
-              if (values.isNullAt(index1)) {
-                arrayWriter1.setNull(index1);
-              } else {
-                final InternalRow element1 = values.getStruct(index1, 2);
-
-                final int tmpCursor3 = holder.cursor;
-
-                if (element1 instanceof UnsafeRow) {
-
-                  final int sizeInBytes2 = ((UnsafeRow) element1).getSizeInBytes();
-                  // grow the global buffer before writing data.
-                  holder.grow(sizeInBytes2);
-                  ((UnsafeRow) element1).writeToMemory(holder.buffer, holder.cursor);
-                  holder.cursor += sizeInBytes2;
-
-                } else {
-                  rowWriter1.reset();
-
-
-                  boolean isNull1 = element1.isNullAt(0);
-                  float value1 = isNull1 ? -1.0f : element1.getFloat(0);
-
-                  if (isNull1) {
-                    rowWriter1.setNullAt(0);
-                  } else {
-                    rowWriter1.write(0, value1);
-                  }
-
-
-                  boolean isNull2 = element1.isNullAt(1);
-                  float value2 = isNull2 ? -1.0f : element1.getFloat(1);
-
-                  if (isNull2) {
-                    rowWriter1.setNullAt(1);
-                  } else {
-                    rowWriter1.write(1, value2);
-                  }
-                }
-
-                arrayWriter1.setOffsetAndSize(index1, tmpCursor3, holder.cursor - tmpCursor3);
-
-              }
-            }
-          }
-
-        }
-
-        rowWriter.setOffsetAndSize(0, tmpCursor, holder.cursor - tmpCursor);
-      }
-      result.setTotalSize(holder.totalSize());
-      return result;
-    }
-  }
-
-  class Example7 extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
-
-    private Object[] references;
-    private UnsafeRow result;
-    private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder holder;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter arrayWriter;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter arrayWriter1;
-
-    public Example7(Object[] references) {
-      this.references = references;
-      result = new UnsafeRow(1);
-      this.holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(result, 32);
-      this.rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 1);
-      this.arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
-      this.arrayWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
-
-    }
-
-    public void initialize(int partitionIndex) {
-
-    }
-
-    public UnsafeRow apply(InternalRow i) {
-      holder.reset();
-
-      rowWriter.zeroOutNullBytes();
-
-
-      boolean isNull = i.isNullAt(0);
-      MapData value = isNull ? null : (i.getMap(0));
-      if (isNull) {
-        rowWriter.setNullAt(0);
-      } else {
-        // Remember the current cursor so that we can calculate how many bytes are
-        // written later.
-        final int tmpCursor = holder.cursor;
-
-        if (value instanceof UnsafeMapData) {
-
-          final int sizeInBytes = ((UnsafeMapData) value).getSizeInBytes();
-          // grow the global buffer before writing data.
-          holder.grow(sizeInBytes);
-          ((UnsafeMapData) value).writeToMemory(holder.buffer, holder.cursor);
-          holder.cursor += sizeInBytes;
-
-        } else {
-          final ArrayData keys = value.keyArray();
-          final ArrayData values = value.valueArray();
-
-          // preserve 8 bytes to write the key array numBytes later.
-          holder.grow(8);
-          holder.cursor += 8;
-
-          // Remember the current cursor so that we can write numBytes of key array later.
-          final int tmpCursor1 = holder.cursor;
-
-
-          if (keys instanceof UnsafeArrayData) {
-
-            final int sizeInBytes1 = ((UnsafeArrayData) keys).getSizeInBytes();
-            // grow the global buffer before writing data.
-            holder.grow(sizeInBytes1);
-            ((UnsafeArrayData) keys).writeToMemory(holder.buffer, holder.cursor);
-            holder.cursor += sizeInBytes1;
-
-          } else {
-            final int numElements = keys.numElements();
-            arrayWriter.initialize(holder, numElements, 8);
-
-            for (int index = 0; index < numElements; index++) {
-              if (keys.isNullAt(index)) {
-                arrayWriter.setNull(index);
-              } else {
-                final UTF8String element = keys.getUTF8String(index);
-                arrayWriter.write(index, element);
-              }
-            }
-          }
-
-          // Write the numBytes of key array into the first 8 bytes.
-          Platform.putLong(holder.buffer, tmpCursor1 - 8, holder.cursor - tmpCursor1);
-
-
-          if (values instanceof UnsafeArrayData) {
-
-            final int sizeInBytes2 = ((UnsafeArrayData) values).getSizeInBytes();
-            // grow the global buffer before writing data.
-            holder.grow(sizeInBytes2);
-            ((UnsafeArrayData) values).writeToMemory(holder.buffer, holder.cursor);
-            holder.cursor += sizeInBytes2;
-
-          } else {
-            final int numElements1 = values.numElements();
-            arrayWriter1.initialize(holder, numElements1, 8);
-
-            for (int index1 = 0; index1 < numElements1; index1++) {
-              if (values.isNullAt(index1)) {
-                arrayWriter1.setNull(index1);
-              } else {
-                final UTF8String element1 = values.getUTF8String(index1);
-                arrayWriter1.write(index1, element1);
-              }
-            }
-          }
-
-        }
-
-        rowWriter.setOffsetAndSize(0, tmpCursor, holder.cursor - tmpCursor);
-      }
-      result.setTotalSize(holder.totalSize());
-      return result;
-    }
-  }
-}
index a93675f..3b0d32b 100644 (file)
@@ -63,7 +63,6 @@ public class TestDataFrameWrites extends AvroDataTest {
   public static Object[][] parameters() {
     return new Object[][] {
         new Object[] { "parquet" },
-        new Object[] { "orc" },
         new Object[] { "avro" }
     };
   }
index e459ea6..e0c3fa1 100644 (file)
@@ -20,6 +20,7 @@
 package com.netflix.iceberg.spark.source;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.netflix.iceberg.DataFile;
 import com.netflix.iceberg.DataFiles;
@@ -29,12 +30,11 @@ import com.netflix.iceberg.Schema;
 import com.netflix.iceberg.Table;
 import com.netflix.iceberg.avro.Avro;
 import com.netflix.iceberg.avro.AvroSchemaUtil;
-import com.netflix.iceberg.expressions.Expressions;
+import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.expressions.Literal;
 import com.netflix.iceberg.hadoop.HadoopTables;
 import com.netflix.iceberg.io.FileAppender;
 import com.netflix.iceberg.parquet.Parquet;
-import com.netflix.iceberg.spark.SparkExpressions;
 import com.netflix.iceberg.spark.data.TestHelpers;
 import com.netflix.iceberg.transforms.Transform;
 import com.netflix.iceberg.transforms.Transforms;
@@ -45,16 +45,19 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.api.java.UDF1;
-import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.sources.And;
+import org.apache.spark.sql.sources.EqualTo;
+import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.sources.GreaterThan;
+import org.apache.spark.sql.sources.LessThan;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
-import org.apache.spark.sql.sources.v2.reader.SupportsPushDownCatalystFilters;
-import org.apache.spark.sql.sources.v2.reader.SupportsScanUnsafeRow;
-import org.apache.spark.sql.types.DateType$;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
 import org.apache.spark.sql.types.IntegerType$;
-import org.apache.spark.sql.types.StringType$;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -74,10 +77,7 @@ import java.util.UUID;
 import static com.netflix.iceberg.Files.localOutput;
 import static org.apache.spark.sql.catalyst.util.DateTimeUtils.fromJavaTimestamp;
 import static org.apache.spark.sql.functions.callUDF;
-import static org.apache.spark.sql.functions.col;
 import static org.apache.spark.sql.functions.column;
-import static org.apache.spark.sql.functions.lit;
-import static org.apache.spark.sql.functions.to_date;
 
 @RunWith(Parameterized.class)
 public class TestFilteredScan {
@@ -102,10 +102,6 @@ public class TestFilteredScan {
       .hour("ts")
       .build();
 
-  private static final PartitionSpec PARTITION_BY_FIRST_LETTER = PartitionSpec.builderFor(SCHEMA)
-      .truncate("data", 1)
-      .build();
-
   private static SparkSession spark = null;
 
   @BeforeClass
@@ -118,18 +114,13 @@ public class TestFilteredScan {
 
     Transform<Long, Integer> day = Transforms.day(Types.TimestampType.withZone());
     spark.udf().register("ts_day",
-        (UDF1<Timestamp, Integer>) timestamp -> day.apply(fromJavaTimestamp(timestamp)),
+        (UDF1<Timestamp, Integer>) timestamp -> day.apply((Long) fromJavaTimestamp(timestamp)),
         IntegerType$.MODULE$);
 
     Transform<Long, Integer> hour = Transforms.hour(Types.TimestampType.withZone());
     spark.udf().register("ts_hour",
-        (UDF1<Timestamp, Integer>) timestamp -> hour.apply(fromJavaTimestamp(timestamp)),
+        (UDF1<Timestamp, Integer>) timestamp -> hour.apply((Long) fromJavaTimestamp(timestamp)),
         IntegerType$.MODULE$);
-
-    Transform<CharSequence, CharSequence> trunc1 = Transforms.truncate(Types.StringType.get(), 1);
-    spark.udf().register("trunc1",
-        (UDF1<CharSequence, CharSequence>) str -> trunc1.apply(str.toString()),
-        StringType$.MODULE$);
   }
 
   @AfterClass
@@ -216,9 +207,9 @@ public class TestFilteredScan {
     for (int i = 0; i < 10; i += 1) {
       DataSourceReader reader = source.createReader(options);
 
-      pushFilters(reader, Expressions.equal("id", i));
+      pushFilters(reader, EqualTo.apply("id", i));
 
-      List<DataReaderFactory<UnsafeRow>> tasks = planTasks(reader);
+      List<InputPartition<InternalRow>> tasks = reader.planInputPartitions();
       Assert.assertEquals("Should only create one task for a small file", 1, tasks.size());
 
       // validate row filtering
@@ -237,9 +228,9 @@ public class TestFilteredScan {
 
     DataSourceReader reader = source.createReader(options);
 
-    pushFilters(reader, Expressions.lessThan("ts", "2017-12-22T00:00:00+00:00"));
+    pushFilters(reader, LessThan.apply("ts", "2017-12-22T00:00:00+00:00"));
 
-    List<DataReaderFactory<UnsafeRow>> tasks = planTasks(reader);
+    List<InputPartition<InternalRow>> tasks = reader.planInputPartitions();
     Assert.assertEquals("Should only create one task for a small file", 1, tasks.size());
 
     assertEqualsSafe(SCHEMA.asStruct(), expected(5,6,7,8,9),
@@ -257,14 +248,14 @@ public class TestFilteredScan {
     IcebergSource source = new IcebergSource();
     DataSourceReader unfiltered = source.createReader(options);
     Assert.assertEquals("Unfiltered table should created 4 read tasks",
-        4, planTasks(unfiltered).size());
+        4, unfiltered.planInputPartitions().size());
 
     for (int i = 0; i < 10; i += 1) {
       DataSourceReader reader = source.createReader(options);
 
-      pushFilters(reader, Expressions.equal("id", i));
+      pushFilters(reader, EqualTo.apply("id", i));
 
-      List<DataReaderFactory<UnsafeRow>> tasks = planTasks(reader);
+      List<InputPartition<InternalRow>> tasks = reader.planInputPartitions();
 
       // validate predicate push-down
       Assert.assertEquals("Should create one task for a single bucket", 1, tasks.size());
@@ -282,18 +273,17 @@ public class TestFilteredScan {
         "path", location.toString())
     );
 
-    int day = Literal.of("2017-12-21").<Integer>to(Types.DateType.get()).value();
     IcebergSource source = new IcebergSource();
     DataSourceReader unfiltered = source.createReader(options);
     Assert.assertEquals("Unfiltered table should created 2 read tasks",
-        2, planTasks(unfiltered).size());
+        2, unfiltered.planInputPartitions().size());
 
     {
       DataSourceReader reader = source.createReader(options);
 
-      pushFilters(reader, Expressions.lessThan("ts", "2017-12-22T00:00:00+00:00"));
+      pushFilters(reader, LessThan.apply("ts", "2017-12-22T00:00:00+00:00"));
 
-      List<DataReaderFactory<UnsafeRow>> tasks = planTasks(reader);
+      List<InputPartition<InternalRow>> tasks = reader.planInputPartitions();
       Assert.assertEquals("Should create one task for 2017-12-21", 1, tasks.size());
 
       assertEqualsSafe(SCHEMA.asStruct(), expected(5, 6, 7, 8, 9),
@@ -303,35 +293,11 @@ public class TestFilteredScan {
     {
       DataSourceReader reader = source.createReader(options);
 
-      pushFilters(reader, col("ts").cast(DateType$.MODULE$).$eq$eq$eq(lit(day)).expr());
-
-      List<DataReaderFactory<UnsafeRow>> tasks = planTasks(reader);
-      Assert.assertEquals("Should create one task for 2017-12-21", 1, tasks.size());
-
-      assertEqualsSafe(SCHEMA.asStruct(), expected(5, 6, 7, 8, 9),
-          read(location.toString(), "cast(ts as date) = date '2017-12-21'"));
-    }
+      pushFilters(reader, And.apply(
+          GreaterThan.apply("ts", "2017-12-22T06:00:00+00:00"),
+          LessThan.apply("ts", "2017-12-22T08:00:00+00:00")));
 
-    {
-      DataSourceReader reader = source.createReader(options);
-
-      pushFilters(reader, to_date(col("ts")).$eq$eq$eq(lit(day)).expr());
-
-      List<DataReaderFactory<UnsafeRow>> tasks = planTasks(reader);
-      Assert.assertEquals("Should create one task for 2017-12-21", 1, tasks.size());
-
-      assertEqualsSafe(SCHEMA.asStruct(), expected(5, 6, 7, 8, 9),
-          read(location.toString(), "to_date(ts) = date '2017-12-21'"));
-    }
-
-    {
-      DataSourceReader reader = source.createReader(options);
-
-      pushFilters(reader, Expressions.and(
-          Expressions.greaterThan("ts", "2017-12-22T06:00:00+00:00"),
-          Expressions.lessThan("ts", "2017-12-22T08:00:00+00:00")));
-
-      List<DataReaderFactory<UnsafeRow>> tasks = planTasks(reader);
+      List<InputPartition<InternalRow>> tasks = reader.planInputPartitions();
       Assert.assertEquals("Should create one task for 2017-12-22", 1, tasks.size());
 
       assertEqualsSafe(SCHEMA.asStruct(), expected(1, 2), read(location.toString(),
@@ -351,14 +317,14 @@ public class TestFilteredScan {
     IcebergSource source = new IcebergSource();
     DataSourceReader unfiltered = source.createReader(options);
     Assert.assertEquals("Unfiltered table should created 9 read tasks",
-        9, planTasks(unfiltered).size());
+        9, unfiltered.planInputPartitions().size());
 
     {
       DataSourceReader reader = source.createReader(options);
 
-      pushFilters(reader, Expressions.lessThan("ts", "2017-12-22T00:00:00+00:00"));
+      pushFilters(reader, LessThan.apply("ts", "2017-12-22T00:00:00+00:00"));
 
-      List<DataReaderFactory<UnsafeRow>> tasks = planTasks(reader);
+      List<InputPartition<InternalRow>> tasks = reader.planInputPartitions();
       Assert.assertEquals("Should create 4 tasks for 2017-12-21: 15, 17, 21, 22", 4, tasks.size());
 
       assertEqualsSafe(SCHEMA.asStruct(), expected(8, 9, 7, 6, 5),
@@ -368,11 +334,11 @@ public class TestFilteredScan {
     {
       DataSourceReader reader = source.createReader(options);
 
-      pushFilters(reader, Expressions.and(
-          Expressions.greaterThan("ts", "2017-12-22T06:00:00+00:00"),
-          Expressions.lessThan("ts", "2017-12-22T08:00:00+00:00")));
+      pushFilters(reader, And.apply(
+          GreaterThan.apply("ts", "2017-12-22T06:00:00+00:00"),
+          LessThan.apply("ts", "2017-12-22T08:00:00+00:00")));
 
-      List<DataReaderFactory<UnsafeRow>> tasks = planTasks(reader);
+      List<InputPartition<InternalRow>> tasks = reader.planInputPartitions();
       Assert.assertEquals("Should create 2 tasks for 2017-12-22: 6, 7", 2, tasks.size());
 
       assertEqualsSafe(SCHEMA.asStruct(), expected(2, 1), read(location.toString(),
@@ -382,41 +348,6 @@ public class TestFilteredScan {
   }
 
   @Test
-  public void testTrunctateDataPartitionedFilters() {
-    File location = buildPartitionedTable("trunc", PARTITION_BY_FIRST_LETTER, "trunc1", "data");
-
-    DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
-        "path", location.toString())
-    );
-
-    IcebergSource source = new IcebergSource();
-    DataSourceReader unfiltered = source.createReader(options);
-    Assert.assertEquals("Unfiltered table should have created 9 read tasks",
-        9, planTasks(unfiltered).size());
-
-    {
-      DataSourceReader reader = source.createReader(options);
-
-      pushFilters(reader, Expressions.equal("data", "goldfish"));
-
-      List<DataReaderFactory<UnsafeRow>> tasks = planTasks(reader);
-      Assert.assertEquals("Should create 1 task for 'goldfish' (g)", 1, tasks.size());
-    }
-
-    {
-      DataSourceReader reader = source.createReader(options);
-
-      pushFilters(reader, col("data").$eq$eq$eq("goldfish").expr());
-
-      List<DataReaderFactory<UnsafeRow>> tasks = planTasks(reader);
-      Assert.assertEquals("Should create 1 task for 'goldfish' (g)", 1, tasks.size());
-    }
-
-    assertEqualsSafe(SCHEMA.asStruct(), expected(9),
-        read(location.toString(), "data = 'goldfish'"));
-  }
-
-  @Test
   public void testFilterByNonProjectedColumn() {
     {
       Schema actualProjection = SCHEMA.select("id", "data");
@@ -426,9 +357,9 @@ public class TestFilteredScan {
       }
 
       assertEqualsSafe(actualProjection.asStruct(), expected, read(
-              unpartitioned.toString(),
-              "cast('2017-12-22 00:00:00+00:00' as timestamp) > ts",
-              "id", "data"));
+          unpartitioned.toString(),
+          "ts < cast('2017-12-22 00:00:00+00:00' as timestamp)",
+          "id", "data"));
     }
 
     {
@@ -443,7 +374,7 @@ public class TestFilteredScan {
       assertEqualsSafe(actualProjection.asStruct(), expected, read(
           unpartitioned.toString(),
           "ts > cast('2017-12-22 06:00:00+00:00' as timestamp) and " +
-              "cast('2017-12-22 08:00:00+00:00' as timestamp) > ts",
+              "ts < cast('2017-12-22 08:00:00+00:00' as timestamp)",
           "id"));
     }
   }
@@ -459,6 +390,16 @@ public class TestFilteredScan {
     return result;
   }
 
+  public static void assertEqualsUnsafe(Types.StructType struct,
+                                        List<Record> expected, List<UnsafeRow> actual) {
+    // TODO: match records by ID
+    int numRecords = Math.min(expected.size(), actual.size());
+    for (int i = 0; i < numRecords; i += 1) {
+      TestHelpers.assertEqualsUnsafe(struct, expected.get(i), actual.get(i));
+    }
+    Assert.assertEquals("Number of results should match expected", expected.size(), actual.size());
+  }
+
   public static void assertEqualsSafe(Types.StructType struct,
                                       List<Record> expected, List<Row> actual) {
     // TODO: match records by ID
@@ -477,26 +418,10 @@ public class TestFilteredScan {
     return expected;
   }
 
-  private void pushFilters(DataSourceReader reader,
-                           com.netflix.iceberg.expressions.Expression... filters) {
-    Expression[] expressions = new Expression[filters.length];
-    for (int i = 0; i < filters.length; i += 1) {
-      expressions[i] = SparkExpressions.convert(filters[i], SCHEMA);
-    }
-    pushFilters(reader, expressions);
-  }
-
-  private void pushFilters(DataSourceReader reader,
-                           Expression... expressions) {
-    Assert.assertTrue(reader instanceof SupportsPushDownCatalystFilters);
-    SupportsPushDownCatalystFilters filterable = (SupportsPushDownCatalystFilters) reader;
-    filterable.pushCatalystFilters(expressions);
-  }
-
-  private List<DataReaderFactory<UnsafeRow>> planTasks(DataSourceReader reader) {
-    Assert.assertTrue(reader instanceof SupportsScanUnsafeRow);
-    SupportsScanUnsafeRow unsafeReader = (SupportsScanUnsafeRow) reader;
-    return unsafeReader.createUnsafeRowReaderFactories();
+  private void pushFilters(DataSourceReader reader, Filter... filters) {
+    Assert.assertTrue(reader instanceof SupportsPushDownFilters);
+    SupportsPushDownFilters filterable = (SupportsPushDownFilters) reader;
+    filterable.pushFilters(filters);
   }
 
   private File buildPartitionedTable(String desc, PartitionSpec spec, String udf, String partitionColumn) {
diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestOrcScan.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestOrcScan.java
deleted file mode 100644 (file)
index 4a6fb26..0000000
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Copyright 2018 Hortonworks
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.netflix.iceberg.spark.source;
-
-import com.netflix.iceberg.DataFile;
-import com.netflix.iceberg.DataFiles;
-import com.netflix.iceberg.FileFormat;
-import com.netflix.iceberg.Metrics;
-import com.netflix.iceberg.PartitionSpec;
-import com.netflix.iceberg.Schema;
-import com.netflix.iceberg.Table;
-import com.netflix.iceberg.hadoop.HadoopTables;
-import com.netflix.iceberg.io.FileAppender;
-import com.netflix.iceberg.orc.ORC;
-import com.netflix.iceberg.orc.OrcFileAppender;
-import com.netflix.iceberg.spark.data.AvroDataTest;
-import com.netflix.iceberg.spark.data.RandomData;
-import com.netflix.iceberg.spark.data.SparkOrcWriter;
-import com.netflix.iceberg.spark.data.TestHelpers;
-import com.netflix.iceberg.types.Type;
-import com.netflix.iceberg.types.Types;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
-import org.apache.orc.storage.serde2.io.DateWritable;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
-import org.apache.spark.sql.catalyst.util.ArrayData;
-import org.apache.spark.sql.catalyst.util.DateTimeUtils;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.sql.Date;
-import java.sql.Timestamp;
-import java.util.Iterator;
-import java.util.List;
-import java.util.UUID;
-
-import static com.netflix.iceberg.Files.localOutput;
-
-public class TestOrcScan extends AvroDataTest {
-  private static final Configuration CONF = new Configuration();
-
-  @Rule
-  public TemporaryFolder temp = new TemporaryFolder();
-
-  private static SparkSession spark = null;
-
-  @BeforeClass
-  public static void startSpark() {
-    TestOrcScan.spark = SparkSession.builder().master("local[2]").getOrCreate();
-  }
-
-  @AfterClass
-  public static void stopSpark() {
-    SparkSession spark = TestOrcScan.spark;
-    TestOrcScan.spark = null;
-    spark.stop();
-  }
-
-  @Override
-  protected void writeAndValidate(Schema schema) throws IOException {
-    System.out.println("Starting ORC test with " + schema);
-    final int ROW_COUNT = 100;
-    final long SEED = 1;
-    File parent = temp.newFolder("orc");
-    File location = new File(parent, "test");
-    File dataFolder = new File(location, "data");
-    dataFolder.mkdirs();
-
-    File orcFile = new File(dataFolder,
-        FileFormat.ORC.addExtension(UUID.randomUUID().toString()));
-
-    HadoopTables tables = new HadoopTables(CONF);
-    Table table = tables.create(schema, PartitionSpec.unpartitioned(),
-        location.toString());
-
-    // Important: use the table's schema for the rest of the test
-    // When tables are created, the column ids are reassigned.
-    Schema tableSchema = table.schema();
-
-    Metrics metrics;
-    SparkOrcWriter writer = new SparkOrcWriter(ORC.write(localOutput(orcFile))
-        .schema(tableSchema)
-        .build());
-    try {
-      writer.addAll(RandomData.generateSpark(tableSchema, ROW_COUNT, SEED));
-    } finally {
-      writer.close();
-      // close writes the last batch, so metrics are not correct until after close is called
-      metrics = writer.metrics();
-    }
-
-    DataFile file = DataFiles.builder(PartitionSpec.unpartitioned())
-        .withFileSizeInBytes(orcFile.length())
-        .withPath(orcFile.toString())
-        .withMetrics(metrics)
-        .build();
-
-    table.newAppend().appendFile(file).commit();
-
-    Dataset<Row> df = spark.read()
-        .format("iceberg")
-        .load(location.toString());
-
-    List<Row> rows = df.collectAsList();
-    Assert.assertEquals("Wrong number of rows", ROW_COUNT, rows.size());
-    Iterator<InternalRow> expected = RandomData.generateSpark(tableSchema,
-        ROW_COUNT, SEED);
-    for(int i=0; i < ROW_COUNT; ++i) {
-      TestHelpers.assertEquals("row " + i, schema.asStruct(), expected.next(),
-          rows.get(i));
-    }
-  }
-}
diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestOrcWrite.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestOrcWrite.java
deleted file mode 100644 (file)
index bc12670..0000000
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Copyright 2018 Hortonworks
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.netflix.iceberg.spark.source;
-
-import com.google.common.collect.Lists;
-import com.netflix.iceberg.FileFormat;
-import com.netflix.iceberg.PartitionSpec;
-import com.netflix.iceberg.Schema;
-import com.netflix.iceberg.Table;
-import com.netflix.iceberg.hadoop.HadoopTables;
-import com.netflix.iceberg.types.Types;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.orc.CompressionKind;
-import org.apache.orc.OrcConf;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparkSession;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-
-import static com.netflix.iceberg.types.Types.NestedField.optional;
-
-public class TestOrcWrite {
-  private static final Configuration CONF = new Configuration();
-  private static final Schema SCHEMA = new Schema(
-      optional(1, "id", Types.IntegerType.get()),
-      optional(2, "data", Types.StringType.get())
-  );
-
-  @Rule
-  public TemporaryFolder temp = new TemporaryFolder();
-
-  private static SparkSession spark = null;
-
-  @BeforeClass
-  public static void startSpark() {
-    TestOrcWrite.spark = SparkSession.builder().master("local[2]").getOrCreate();
-  }
-
-  @AfterClass
-  public static void stopSpark() {
-    SparkSession spark = TestOrcWrite.spark;
-    TestOrcWrite.spark = null;
-    spark.stop();
-  }
-
-  @Test
-  public void testBasicWrite() throws IOException {
-    File parent = temp.newFolder("orc");
-    File location = new File(parent, "test");
-    location.mkdirs();
-
-    HadoopTables tables = new HadoopTables(CONF);
-    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
-    Table table = tables.create(SCHEMA, spec, location.toString());
-    table.updateProperties()
-        .defaultFormat(FileFormat.ORC)
-        .set(OrcConf.COMPRESS.getAttribute(), CompressionKind.NONE.name())
-        .commit();
-
-    List<SimpleRecord> expected = Lists.newArrayList(
-        new SimpleRecord(1, "a"),
-        new SimpleRecord(2, "b"),
-        new SimpleRecord(3, "c")
-    );
-
-    Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
-
-    // TODO: incoming columns must be ordered according to the table's schema
-    df.select("id", "data").write()
-        .format("iceberg")
-        .mode("append")
-        .save(location.toString());
-
-    table.refresh();
-
-    Dataset<Row> result = spark.read()
-        .format("iceberg")
-        .load(location.toString());
-
-    List<SimpleRecord> actual = result.orderBy("id").as(
-        Encoders.bean(SimpleRecord.class)).collectAsList();
-
-    Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
-    Assert.assertEquals("Result rows should match", expected, actual);
-  }
-}
index be6876b..d35bba3 100644 (file)
@@ -88,7 +88,7 @@ public class TestSparkReadProjection extends TestReadProjection {
 
   @Override
   protected Record writeAndRead(String desc, Schema writeSchema, Schema readSchema,
-                                            Record record) throws IOException {
+                                Record record) throws IOException {
     File parent = temp.newFolder(desc);
     File location = new File(parent, "test");
     File dataFolder = new File(location, "data");
index fc9253e..a7ff513 100644 (file)
@@ -51,9 +51,18 @@ class TestTables {
 
   static TestTable load(String name) {
     TestTableOperations ops = new TestTableOperations(name);
+    if (ops.current() == null) {
+      return null;
+    }
     return new TestTable(ops, name);
   }
 
+  static boolean drop(String name) {
+    synchronized (METADATA) {
+      return METADATA.remove(name) != null;
+    }
+  }
+
   static class TestTable extends BaseTable {
     private final TestTableOperations ops;