Fix type handling in Spark and Pig. (#49)
authorRyan Blue <rdblue@users.noreply.github.com>
Thu, 13 Dec 2018 16:40:17 +0000 (08:40 -0800)
committerGitHub <noreply@github.com>
Thu, 13 Dec 2018 16:40:17 +0000 (08:40 -0800)
pig/src/main/java/com/netflix/iceberg/pig/PigParquetReader.java
spark/src/main/java/com/netflix/iceberg/spark/data/SparkParquetReaders.java

index 1de637f..dbd9282 100644 (file)
@@ -38,6 +38,7 @@ import com.netflix.iceberg.parquet.ParquetValueReaders.StringReader;
 import com.netflix.iceberg.parquet.ParquetValueReaders.StructReader;
 import com.netflix.iceberg.parquet.ParquetValueReaders.UnboxedReader;
 import com.netflix.iceberg.parquet.TypeWithSchemaVisitor;
+import com.netflix.iceberg.types.Type.TypeID;
 import com.netflix.iceberg.types.Types;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.schema.DecimalMetadata;
@@ -208,8 +209,10 @@ public class PigParquetReader {
           case INT_8:
           case INT_16:
           case INT_32:
-            if(expected.typeId() == Types.LongType.get().typeId()) {
+            if (expected != null && expected.typeId() == Types.LongType.get().typeId()) {
               return new IntAsLongReader(desc);
+            } else {
+              return new UnboxedReader(desc);
             }
           case INT_64: return new UnboxedReader<>(desc);
           case TIMESTAMP_MILLIS: return new TimestampMillisReader(desc);
@@ -234,13 +237,20 @@ public class PigParquetReader {
         case FIXED_LEN_BYTE_ARRAY:
         case BINARY:
           return new BytesReader(desc);
-        case BOOLEAN:
         case INT32:
-        case INT64:
+          if (expected != null && expected.typeId() == TypeID.LONG) {
+            return new IntAsLongReader(desc);
+          } else {
+            return new UnboxedReader<>(desc);
+          }
         case FLOAT:
-          if(expected.typeId() == Types.DoubleType.get().typeId()) {
+          if (expected != null && expected.typeId() == TypeID.DOUBLE) {
             return new FloatAsDoubleReader(desc);
+          } else {
+            return new UnboxedReader<>(desc);
           }
+        case BOOLEAN:
+        case INT64:
         case DOUBLE:
           return new UnboxedReader<>(desc);
         default:
index 1791014..336ffb1 100644 (file)
@@ -204,10 +204,15 @@ public class SparkParquetReaders {
           case JSON:
           case UTF8:
             return new StringReader(desc);
-          case DATE:
           case INT_8:
           case INT_16:
           case INT_32:
+            if (expected != null && expected.typeId() == Types.LongType.get().typeId()) {
+              return new IntAsLongReader(desc);
+            } else {
+              return new UnboxedReader(desc);
+            }
+          case DATE:
           case INT_64:
           case TIMESTAMP_MICROS:
             return new UnboxedReader<>(desc);