SQOOP-3382: Add parquet numeric support for Parquet in hdfs import
authorSzabolcs Vasas <vasas@apache.org>
Wed, 14 Nov 2018 10:29:02 +0000 (11:29 +0100)
committerSzabolcs Vasas <vasas@apache.org>
Wed, 14 Nov 2018 10:29:02 +0000 (11:29 +0100)
(Fero Szabo via Szabolcs Vasas)

17 files changed:
src/java/org/apache/sqoop/config/ConfigurationConstants.java
src/java/org/apache/sqoop/mapreduce/ParquetImportMapper.java
src/java/org/apache/sqoop/mapreduce/parquet/hadoop/HadoopParquetImportJobConfigurator.java
src/java/org/apache/sqoop/orm/AvroSchemaGenerator.java
src/test/org/apache/sqoop/importjob/NumericTypesImportTest.java [moved from src/test/org/apache/sqoop/importjob/avro/AvroImportForNumericTypesTest.java with 52% similarity]
src/test/org/apache/sqoop/importjob/SplitByImportTest.java
src/test/org/apache/sqoop/importjob/configuration/AvroTestConfiguration.java [new file with mode: 0644]
src/test/org/apache/sqoop/importjob/configuration/GenericImportJobSplitByTestConfiguration.java
src/test/org/apache/sqoop/importjob/configuration/ImportJobTestConfiguration.java [moved from src/test/org/apache/sqoop/importjob/ImportJobTestConfiguration.java with 93% similarity]
src/test/org/apache/sqoop/importjob/configuration/MSSQLServerImportJobTestConfiguration.java [moved from src/test/org/apache/sqoop/importjob/avro/configuration/MSSQLServerImportJobTestConfiguration.java with 81% similarity]
src/test/org/apache/sqoop/importjob/configuration/MySQLImportJobTestConfiguration.java [moved from src/test/org/apache/sqoop/importjob/avro/configuration/MySQLImportJobTestConfiguration.java with 81% similarity]
src/test/org/apache/sqoop/importjob/configuration/OracleImportJobTestConfiguration.java [moved from src/test/org/apache/sqoop/importjob/avro/configuration/OracleImportJobTestConfiguration.java with 82% similarity]
src/test/org/apache/sqoop/importjob/configuration/OracleImportJobTestConfigurationForNumber.java [moved from src/test/org/apache/sqoop/importjob/avro/configuration/OracleImportJobTestConfigurationForNumber.java with 83% similarity]
src/test/org/apache/sqoop/importjob/configuration/ParquetTestConfiguration.java [new file with mode: 0644]
src/test/org/apache/sqoop/importjob/configuration/PostgresqlImportJobTestConfigurationForNumeric.java [moved from src/test/org/apache/sqoop/importjob/avro/configuration/PostgresqlImportJobTestConfigurationForNumeric.java with 82% similarity]
src/test/org/apache/sqoop/importjob/configuration/PostgresqlImportJobTestConfigurationPaddingShouldSucceed.java [moved from src/test/org/apache/sqoop/importjob/avro/configuration/PostgresqlImportJobTestConfigurationPaddingShouldSucceed.java with 80% similarity]
src/test/org/apache/sqoop/util/ParquetReader.java

index 3724f25..7592846 100644 (file)
@@ -97,6 +97,11 @@ public final class ConfigurationConstants {
   public static final String PROP_ENABLE_AVRO_LOGICAL_TYPE_DECIMAL = "sqoop.avro.logical_types.decimal.enable";
 
   /**
+   * Enable parquet logical types (decimal support only).
+   */
+  public static final String PROP_ENABLE_PARQUET_LOGICAL_TYPE_DECIMAL = "sqoop.parquet.logical_types.decimal.enable";
+
+  /**
    * Default precision for avro schema
    */
   public static final String PROP_AVRO_DECIMAL_PRECISION = "sqoop.avro.logical_types.decimal.default.precision";
index 62334f8..b386079 100644 (file)
@@ -18,6 +18,9 @@
 
 package org.apache.sqoop.mapreduce;
 
+import org.apache.avro.Conversions;
+import org.apache.avro.generic.GenericData;
+import org.apache.sqoop.config.ConfigurationConstants;
 import org.apache.sqoop.lib.LargeObjectLoader;
 import org.apache.sqoop.lib.SqoopRecord;
 import org.apache.avro.Schema;
@@ -39,6 +42,7 @@ public abstract class ParquetImportMapper<KEYOUT, VALOUT>
   private Schema schema = null;
   private boolean bigDecimalFormatString = true;
   private LargeObjectLoader lobLoader = null;
+  private boolean bigDecimalPadding;
 
   @Override
   protected void setup(Context context)
@@ -49,6 +53,8 @@ public abstract class ParquetImportMapper<KEYOUT, VALOUT>
         ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
         ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
     lobLoader = createLobLoader(context);
+    GenericData.get().addLogicalTypeConversion(new Conversions.DecimalConversion());
+    bigDecimalPadding = conf.getBoolean(ConfigurationConstants.PROP_ENABLE_AVRO_DECIMAL_PADDING, false);
   }
 
   @Override
@@ -62,7 +68,7 @@ public abstract class ParquetImportMapper<KEYOUT, VALOUT>
     }
 
     GenericRecord record = AvroUtil.toGenericRecord(val.getFieldMap(), schema,
-        bigDecimalFormatString);
+        bigDecimalFormatString, bigDecimalPadding);
     write(context, record);
   }
 
index e821543..aa9740b 100644 (file)
@@ -25,7 +25,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.parquet.avro.GenericDataSupplier;
 import org.apache.sqoop.SqoopOptions;
+import org.apache.sqoop.config.ConfigurationConstants;
 import org.apache.sqoop.mapreduce.parquet.ParquetImportJobConfigurator;
 import org.apache.parquet.avro.AvroParquetOutputFormat;
 import org.apache.parquet.hadoop.ParquetOutputFormat;
@@ -46,6 +48,18 @@ public class HadoopParquetImportJobConfigurator implements ParquetImportJobConfi
   public void configureMapper(Job job, Schema schema, SqoopOptions options, String tableName, Path destination) throws IOException {
     configureAvroSchema(job, schema);
     configureOutputCodec(job);
+    configureLogicalTypeSupport(job, options);
+  }
+
+  /**
+   * Configurations needed for logical types, i.e. decimal in parquet.
+   * @param job
+   * @param options
+   */
+  private void configureLogicalTypeSupport(Job job, SqoopOptions options) {
+    if (options.getConf().getBoolean(ConfigurationConstants.PROP_ENABLE_PARQUET_LOGICAL_TYPE_DECIMAL, false)) {
+      AvroParquetOutputFormat.setAvroDataSupplier(job, GenericDataSupplier.class);
+    }
   }
 
   @Override
index 7a2a5f9..05ac46c 100644 (file)
@@ -39,6 +39,9 @@ import org.apache.sqoop.avro.AvroUtil;
 import org.apache.sqoop.config.ConfigurationConstants;
 import org.codehaus.jackson.node.NullNode;
 
+import static org.apache.sqoop.SqoopOptions.FileLayout.AvroDataFile;
+import static org.apache.sqoop.SqoopOptions.FileLayout.ParquetFile;
+
 /**
  * Creates an Avro schema to represent a table from a database.
  */
@@ -126,8 +129,7 @@ public class AvroSchemaGenerator {
   public Schema toAvroSchema(int sqlType, String columnName, Integer precision, Integer scale) {
     List<Schema> childSchemas = new ArrayList<Schema>();
     childSchemas.add(Schema.create(Schema.Type.NULL));
-    if (options.getConf().getBoolean(ConfigurationConstants.PROP_ENABLE_AVRO_LOGICAL_TYPE_DECIMAL, false)
-        && isLogicalType(sqlType)) {
+    if (isLogicalTypeConversionEnabled() && isLogicalType(sqlType)) {
       childSchemas.add(
           toAvroLogicalType(columnName, sqlType, precision, scale)
               .addToSchema(Schema.create(Type.BYTES))
@@ -138,6 +140,20 @@ public class AvroSchemaGenerator {
     return Schema.createUnion(childSchemas);
   }
 
+  /**
+   * @return True if this is a parquet import and parquet logical types are enabled,
+   * or if this is an avro import and avro logical types are enabled. False otherwise.
+   */
+  private boolean isLogicalTypeConversionEnabled() {
+    if (ParquetFile.equals(options.getFileLayout())) {
+      return options.getConf().getBoolean(ConfigurationConstants.PROP_ENABLE_PARQUET_LOGICAL_TYPE_DECIMAL, false);
+    }
+    else if (AvroDataFile.equals(options.getFileLayout())) {
+      return options.getConf().getBoolean(ConfigurationConstants.PROP_ENABLE_AVRO_LOGICAL_TYPE_DECIMAL, false);
+    }
+    return false;
+  }
+
   public Schema toAvroSchema(int sqlType) {
     return toAvroSchema(sqlType, null, null, null);
   }
  * limitations under the License.
  */
 
-package org.apache.sqoop.importjob.avro;
+package org.apache.sqoop.importjob;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
 import org.apache.sqoop.SqoopOptions;
-import org.apache.sqoop.importjob.ImportJobTestConfiguration;
+import org.apache.sqoop.importjob.configuration.AvroTestConfiguration;
+import org.apache.sqoop.importjob.configuration.MSSQLServerImportJobTestConfiguration;
+import org.apache.sqoop.importjob.configuration.MySQLImportJobTestConfiguration;
+import org.apache.sqoop.importjob.configuration.OracleImportJobTestConfiguration;
+import org.apache.sqoop.importjob.configuration.ParquetTestConfiguration;
+import org.apache.sqoop.importjob.configuration.PostgresqlImportJobTestConfigurationForNumeric;
 import org.apache.sqoop.testutil.ArgumentArrayBuilder;
 import org.apache.sqoop.testutil.AvroTestUtils;
 import org.apache.sqoop.testutil.ImportJobTestCase;
@@ -31,12 +39,9 @@ import org.apache.sqoop.testutil.adapter.MSSQLServerDatabaseAdapter;
 import org.apache.sqoop.testutil.adapter.MySqlDatabaseAdapter;
 import org.apache.sqoop.testutil.adapter.OracleDatabaseAdapter;
 import org.apache.sqoop.testutil.adapter.PostgresDatabaseAdapter;
-import org.apache.sqoop.importjob.avro.configuration.MSSQLServerImportJobTestConfiguration;
-import org.apache.sqoop.importjob.avro.configuration.MySQLImportJobTestConfiguration;
-import org.apache.sqoop.importjob.avro.configuration.OracleImportJobTestConfigurationForNumber;
-import org.apache.sqoop.importjob.avro.configuration.OracleImportJobTestConfiguration;
-import org.apache.sqoop.importjob.avro.configuration.PostgresqlImportJobTestConfigurationForNumeric;
-import org.apache.sqoop.importjob.avro.configuration.PostgresqlImportJobTestConfigurationPaddingShouldSucceed;
+import org.apache.sqoop.importjob.configuration.OracleImportJobTestConfigurationForNumber;
+import org.apache.sqoop.importjob.configuration.PostgresqlImportJobTestConfigurationPaddingShouldSucceed;
+import org.apache.sqoop.util.ParquetReader;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -51,6 +56,10 @@ import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.junit.Assert.assertEquals;
+import static org.apache.sqoop.SqoopOptions.FileLayout.AvroDataFile;
+import static org.apache.sqoop.SqoopOptions.FileLayout.ParquetFile;
+
 @RunWith(Parameterized.class)
 /**
  * This test covers the behavior of the Avro import for fixed point decimal types, i.e. NUMBER, NUMERIC
@@ -59,21 +68,20 @@ import java.util.List;
  * Oracle and Postgres store numbers without padding, while other DBs store them padded with 0s.
  *
  * The features tested here affect two phases in Sqoop:
- * 1. Avro schema generation
+ * 1. Avro schema generation during avro and parquet import
  * Default precision and scale are used here to avoid issues with Oracle and Postgres, as these
  * don't return valid precision and scale if they weren't specified in the table DDL.
  *
- * 2. Avro import: padding.
+ * 2. Decimal padding during avro or parquet import
  * In case of Oracle and Postgres, Sqoop has to pad the values with 0s to avoid errors.
  */
-public class AvroImportForNumericTypesTest extends ImportJobTestCase {
+public class NumericTypesImportTest<T extends AvroTestConfiguration & ParquetTestConfiguration> extends ImportJobTestCase {
 
-  public static final Log LOG = LogFactory.getLog(
-      AvroImportForNumericTypesTest.class.getName());
+  public static final Log LOG = LogFactory.getLog(NumericTypesImportTest.class.getName());
 
   private Configuration conf = new Configuration();
 
-  private final ImportJobTestConfiguration configuration;
+  private final T configuration;
   private final DatabaseAdapter adapter;
   private final boolean failWithoutExtraArgs;
   private final boolean failWithPadding;
@@ -86,6 +94,7 @@ public class AvroImportForNumericTypesTest extends ImportJobTestCase {
   // Constants for the test case that has padding specified but not default precision and scale.
   private final static boolean SUCCEED_WITH_PADDING_ONLY = false;
   private final static boolean FAIL_WITH_PADDING_ONLY = true;
+  private Path tableDirPath;
 
   @Parameters(name = "Adapter: {0}| Config: {1}| failWithoutExtraArgs: {2}| failWithPadding: {3}")
   public static Iterable<? extends Object> testConfigurations() {
@@ -101,7 +110,7 @@ public class AvroImportForNumericTypesTest extends ImportJobTestCase {
     );
   }
 
-  public AvroImportForNumericTypesTest(DatabaseAdapter adapter, ImportJobTestConfiguration configuration, boolean failWithoutExtraArgs, boolean failWithPaddingOnly) {
+  public NumericTypesImportTest(DatabaseAdapter adapter, T configuration, boolean failWithoutExtraArgs, boolean failWithPaddingOnly) {
     this.adapter = adapter;
     this.configuration = configuration;
     this.failWithoutExtraArgs = failWithoutExtraArgs;
@@ -148,6 +157,7 @@ public class AvroImportForNumericTypesTest extends ImportJobTestCase {
     for (String[] input  : inputData) {
       insertIntoTable(names, types, input);
     }
+    tableDirPath = new Path(getWarehouseDir() + "/" + getTableName());
   }
 
   @After
@@ -160,51 +170,158 @@ public class AvroImportForNumericTypesTest extends ImportJobTestCase {
     super.tearDown();
   }
 
-  private ArgumentArrayBuilder getArgsBuilder() {
-    ArgumentArrayBuilder builder = AvroTestUtils.getBuilderForAvroPaddingTest(this);
-    builder.withOption("connect", getConnectString());
-    return builder;
+  private ArgumentArrayBuilder getArgsBuilder(SqoopOptions.FileLayout fileLayout) {
+    ArgumentArrayBuilder builder = new ArgumentArrayBuilder();
+    if (AvroDataFile.equals(fileLayout)) {
+      builder.withOption("as-avrodatafile");
+    }
+    else if (ParquetFile.equals(fileLayout)) {
+      builder.withOption("as-parquetfile");
+    }
+
+    return builder.withCommonHadoopFlags(true)
+        .withOption("warehouse-dir", getWarehouseDir())
+        .withOption("num-mappers", "1")
+        .withOption("table", getTableName())
+        .withOption("connect", getConnectString());
   }
 
-  @Test
-  public void testAvroImportWithoutPadding() throws IOException {
-    if (failWithoutExtraArgs) {
+  /**
+   * Adds properties to the given arg builder for decimal precision and scale.
+   * @param builder
+   */
+  private void addPrecisionAndScale(ArgumentArrayBuilder builder) {
+    builder.withProperty("sqoop.avro.logical_types.decimal.default.precision", "38");
+    builder.withProperty("sqoop.avro.logical_types.decimal.default.scale", "3");
+  }
+
+  /**
+   * Enables padding for decimals in avro and parquet import.
+   * @param builder
+   */
+  private void addPadding(ArgumentArrayBuilder builder) {
+    builder.withProperty("sqoop.avro.decimal_padding.enable", "true");
+  }
+
+  private void addEnableAvroDecimal(ArgumentArrayBuilder builder) {
+    builder.withProperty("sqoop.avro.logical_types.decimal.enable", "true");
+  }
+
+  private void addEnableParquetDecimal(ArgumentArrayBuilder builder) {
+    builder.withProperty("sqoop.parquet.logical_types.decimal.enable", "true");
+  }
+
+  private void configureJunitToExpectFailure(boolean failWithPadding) {
+    if (failWithPadding) {
       thrown.expect(IOException.class);
       thrown.expectMessage("Failure during job; return status 1");
     }
-    String[] args = getArgsBuilder().build();
+  }
+
+  @Test
+  public void testAvroImportWithoutPadding() throws IOException {
+    configureJunitToExpectFailure(failWithoutExtraArgs);
+    ArgumentArrayBuilder builder = getArgsBuilder(AvroDataFile);
+    addEnableAvroDecimal(builder);
+    String[] args = builder.build();
     runImport(args);
     if (!failWithoutExtraArgs) {
-      verify();
+      verify(AvroDataFile);
     }
   }
 
   @Test
   public void testAvroImportWithPadding() throws IOException {
-    if (failWithPadding) {
-      thrown.expect(IOException.class);
-      thrown.expectMessage("Failure during job; return status 1");
-    }
-    ArgumentArrayBuilder builder = getArgsBuilder();
-    builder.withProperty("sqoop.avro.decimal_padding.enable", "true");
+    configureJunitToExpectFailure(failWithPadding);
+    ArgumentArrayBuilder builder = getArgsBuilder(AvroDataFile);
+    addEnableAvroDecimal(builder);
+    addPadding(builder);
     runImport(builder.build());
     if (!failWithPadding) {
-      verify();
+      verify(AvroDataFile);
     }
   }
 
   @Test
   public void testAvroImportWithDefaultPrecisionAndScale() throws  IOException {
-    ArgumentArrayBuilder builder = getArgsBuilder();
-    builder.withProperty("sqoop.avro.decimal_padding.enable", "true");
-    builder.withProperty("sqoop.avro.logical_types.decimal.default.precision", "38");
-    builder.withProperty("sqoop.avro.logical_types.decimal.default.scale", "3");
+    ArgumentArrayBuilder builder = getArgsBuilder(AvroDataFile);
+    addEnableAvroDecimal(builder);
+    addPadding(builder);
+    addPrecisionAndScale(builder);
     runImport(builder.build());
-    verify();
+    verify(AvroDataFile);
+  }
+
+  @Test
+  public void testParquetImportWithoutPadding() throws IOException {
+    configureJunitToExpectFailure(failWithoutExtraArgs);
+    ArgumentArrayBuilder builder = getArgsBuilder(ParquetFile);
+    addEnableParquetDecimal(builder);
+    String[] args = builder.build();
+    runImport(args);
+    if (!failWithoutExtraArgs) {
+      verify(ParquetFile);
+    }
+  }
+
+  @Test
+  public void testParquetImportWithPadding() throws IOException {
+    configureJunitToExpectFailure(failWithPadding);
+    ArgumentArrayBuilder builder = getArgsBuilder(ParquetFile);
+    addEnableParquetDecimal(builder);
+    addPadding(builder);
+    runImport(builder.build());
+    if (!failWithPadding) {
+      verify(ParquetFile);
+    }
+  }
+
+  @Test
+  public void testParquetImportWithDefaultPrecisionAndScale() throws IOException {
+    ArgumentArrayBuilder builder = getArgsBuilder(ParquetFile);
+    addEnableParquetDecimal(builder);
+    addPadding(builder);
+    addPrecisionAndScale(builder);
+    runImport(builder.build());
+    verify(ParquetFile);
+  }
+
+  private void verify(SqoopOptions.FileLayout fileLayout) {
+    if (AvroDataFile.equals(fileLayout)) {
+      AvroTestUtils.registerDecimalConversionUsageForVerification();
+      AvroTestUtils.verify(configuration.getExpectedResultsForAvro(), getConf(), getTablePath());
+    } else if (ParquetFile.equals(fileLayout)) {
+      verifyParquetFile();
+    }
+  }
+
+  private void verifyParquetFile() {
+    verifyParquetSchema();
+    verifyParquetContent();
+  }
+
+  private void verifyParquetContent() {
+    ParquetReader reader = new ParquetReader(tableDirPath);
+    assertEquals(Arrays.asList(configuration.getExpectedResultsForParquet()), reader.readAllInCsvSorted());
+  }
+
+  private void verifyParquetSchema() {
+    ParquetReader reader = new ParquetReader(tableDirPath);
+    MessageType parquetSchema = reader.readParquetSchema();
+
+    String[] types = configuration.getTypes();
+    for (int i = 0; i < types.length; i ++) {
+      String type = types[i];
+      if (isNumericSqlType(type)) {
+        OriginalType parquetFieldType = parquetSchema.getFields().get(i).getOriginalType();
+        assertEquals(OriginalType.DECIMAL, parquetFieldType);
+      }
+    }
   }
 
-  private void verify() {
-    AvroTestUtils.registerDecimalConversionUsageForVerification();
-    AvroTestUtils.verify(configuration.getExpectedResults(), getConf(), getTablePath());
+  private boolean isNumericSqlType(String type) {
+    return type.toUpperCase().startsWith("DECIMAL")
+        || type.toUpperCase().startsWith("NUMBER")
+        || type.toUpperCase().startsWith("NUMERIC");
   }
 }
index 7977c0b..c6fe4f2 100644 (file)
@@ -24,6 +24,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.sqoop.SqoopOptions;
 import org.apache.sqoop.importjob.configuration.GenericImportJobSplitByTestConfiguration;
+import org.apache.sqoop.importjob.configuration.ImportJobTestConfiguration;
+import org.apache.sqoop.importjob.configuration.ParquetTestConfiguration;
 import org.apache.sqoop.testutil.ArgumentArrayBuilder;
 import org.apache.sqoop.testutil.ImportJobTestCase;
 import org.apache.sqoop.testutil.adapter.DatabaseAdapter;
@@ -55,7 +57,7 @@ public class SplitByImportTest extends ImportJobTestCase {
 
   private Configuration conf = new Configuration();
 
-  private final ImportJobTestConfiguration configuration;
+  private final ParquetTestConfiguration configuration;
   private final DatabaseAdapter adapter;
 
   @Parameters(name = "Adapter: {0}| Config: {1}")
@@ -69,7 +71,7 @@ public class SplitByImportTest extends ImportJobTestCase {
     );
   }
 
-  public SplitByImportTest(DatabaseAdapter adapter, ImportJobTestConfiguration configuration) {
+  public SplitByImportTest(DatabaseAdapter adapter, ParquetTestConfiguration configuration) {
     this.adapter = adapter;
     this.configuration = configuration;
   }
@@ -148,6 +150,6 @@ public class SplitByImportTest extends ImportJobTestCase {
 
   private void verifyParquetFile() {
     ParquetReader reader = new ParquetReader(new Path(getWarehouseDir() + "/" + getTableName()), getConf());
-    assertEquals(asList(configuration.getExpectedResults()), reader.readAllInCsvSorted());
+    assertEquals(asList(configuration.getExpectedResultsForParquet()), reader.readAllInCsvSorted());
   }
 }
diff --git a/src/test/org/apache/sqoop/importjob/configuration/AvroTestConfiguration.java b/src/test/org/apache/sqoop/importjob/configuration/AvroTestConfiguration.java
new file mode 100644 (file)
index 0000000..1008899
--- /dev/null
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.importjob.configuration;
+
+public interface AvroTestConfiguration extends ImportJobTestConfiguration {
+
+  String[] getExpectedResultsForAvro();
+}
index f137b56..e99b526 100644 (file)
@@ -19,7 +19,6 @@
 package org.apache.sqoop.importjob.configuration;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.sqoop.importjob.ImportJobTestConfiguration;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -28,7 +27,7 @@ import java.util.List;
  * This test configuration intends to cover the fact that oracle stores these types without padding them with 0s,
  * therefore when importing into avro, one has to use the padding feature.
  */
-public class GenericImportJobSplitByTestConfiguration implements ImportJobTestConfiguration {
+public class GenericImportJobSplitByTestConfiguration implements ImportJobTestConfiguration, ParquetTestConfiguration {
 
   public static final String NAME_COLUMN = "NAME";
   public static final char SEPARATOR = ',';
@@ -65,7 +64,7 @@ public class GenericImportJobSplitByTestConfiguration implements ImportJobTestCo
   }
 
   @Override
-  public String[] getExpectedResults() {
+  public String[] getExpectedResultsForParquet() {
     return data.stream()
         .map(element -> StringUtils.join(element, SEPARATOR))
         .toArray(String[]::new);
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.sqoop.importjob;
+package org.apache.sqoop.importjob.configuration;
 
 import java.util.List;
 
@@ -27,6 +27,4 @@ public interface ImportJobTestConfiguration {
   String[] getNames();
 
   List<String[]> getSampleData();
-
-  String[] getExpectedResults();
 }
  * limitations under the License.
  */
 
-package org.apache.sqoop.importjob.avro.configuration;
-
-import org.apache.sqoop.importjob.ImportJobTestConfiguration;
+package org.apache.sqoop.importjob.configuration;
 
 import java.util.ArrayList;
 import java.util.List;
 
-public class MSSQLServerImportJobTestConfiguration implements ImportJobTestConfiguration {
+public class MSSQLServerImportJobTestConfiguration implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration {
 
   @Override
   public String[] getTypes() {
@@ -47,7 +45,7 @@ public class MSSQLServerImportJobTestConfiguration implements ImportJobTestConfi
   }
 
   @Override
-  public String[] getExpectedResults() {
+  public String[] getExpectedResultsForAvro() {
     String expectedRecord = "{\"ID\": 1, \"N1\": 100, \"N2\": 1000000, \"N3\": 1000000.05000, \"N4\": 1000000, \"N5\": 1000000.05000, " +
         "\"D1\": 100, \"D2\": 1000000, \"D3\": 1000000.05000, \"D4\": 1000000, \"D5\": 1000000.05000}";
     String[] expectedResult = new String[1];
@@ -56,6 +54,14 @@ public class MSSQLServerImportJobTestConfiguration implements ImportJobTestConfi
   }
 
   @Override
+  public String[] getExpectedResultsForParquet() {
+    String expectedRecord = "1,100,1000000,1000000.05000,1000000,1000000.05000,100,1000000,1000000.05000,1000000,1000000.05000";
+    String[] expectedResult = new String[1];
+    expectedResult[0] = expectedRecord;
+    return expectedResult;
+  }
+
+  @Override
   public String toString() {
     return getClass().getSimpleName();
   }
  * limitations under the License.
  */
 
-package org.apache.sqoop.importjob.avro.configuration;
-
-import org.apache.sqoop.importjob.ImportJobTestConfiguration;
+package org.apache.sqoop.importjob.configuration;
 
 import java.util.ArrayList;
 import java.util.List;
 
-public class MySQLImportJobTestConfiguration implements ImportJobTestConfiguration {
+public class MySQLImportJobTestConfiguration implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration {
 
   @Override
   public String[] getTypes() {
@@ -46,9 +44,8 @@ public class MySQLImportJobTestConfiguration implements ImportJobTestConfigurati
     return inputData;
   }
 
-
   @Override
-  public String[] getExpectedResults() {
+  public String[] getExpectedResultsForAvro() {
     String expectedRecord = "{\"ID\": 1, \"N1\": 100, \"N2\": 1000000, \"N3\": 1000000.05000, \"N4\": 1000000, \"N5\": 1000000.05000, " +
         "\"D1\": 100, \"D2\": 1000000, \"D3\": 1000000.05000, \"D4\": 1000000, \"D5\": 1000000.05000}";
     String[] expectedResult = new String[1];
@@ -57,6 +54,14 @@ public class MySQLImportJobTestConfiguration implements ImportJobTestConfigurati
   }
 
   @Override
+  public String[] getExpectedResultsForParquet() {
+    String expectedRecord = "1,100,1000000,1000000.05000,1000000,1000000.05000,100,1000000,1000000.05000,1000000,1000000.05000";
+    String[] expectedResult = new String[1];
+    expectedResult[0] = expectedRecord;
+    return expectedResult;
+  }
+
+  @Override
   public String toString() {
     return getClass().getSimpleName();
   }
@@ -16,9 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.sqoop.importjob.avro.configuration;
-
-import org.apache.sqoop.importjob.ImportJobTestConfiguration;
+package org.apache.sqoop.importjob.configuration;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -27,7 +25,7 @@ import java.util.List;
  * This test configuration intends to cover the fact that oracle stores these types without padding them with 0s,
  * therefore when importing into avro, one has to use the padding feature.
  */
-public class OracleImportJobTestConfiguration implements ImportJobTestConfiguration {
+public class OracleImportJobTestConfiguration implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration {
 
   @Override
   public String[] getTypes() {
@@ -49,7 +47,7 @@ public class OracleImportJobTestConfiguration implements ImportJobTestConfigurat
   }
 
   @Override
-  public String[] getExpectedResults() {
+  public String[] getExpectedResultsForAvro() {
     String expectedRecord = "{\"ID\": 1, \"N2\": 1000000, \"N3\": 1000000.05000, \"N4\": 1000000, \"N5\": 1000000.05000, " +
         "\"D1\": 100, \"D2\": 1000000, \"D3\": 1000000.05000, \"D4\": 1000000, \"D5\": 1000000.05000}";
     String[] expectedResult = new String[1];
@@ -58,6 +56,14 @@ public class OracleImportJobTestConfiguration implements ImportJobTestConfigurat
   }
 
   @Override
+  public String[] getExpectedResultsForParquet() {
+    String expectedRecord = "1,1000000,1000000.05000,1000000,1000000.05000,100,1000000,1000000.05000,1000000,1000000.05000";
+    String[] expectedResult = new String[1];
+    expectedResult[0] = expectedRecord;
+    return expectedResult;
+  }
+
+  @Override
   public String toString() {
     return getClass().getSimpleName();
   }
@@ -16,9 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.sqoop.importjob.avro.configuration;
-
-import org.apache.sqoop.importjob.ImportJobTestConfiguration;
+package org.apache.sqoop.importjob.configuration;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -30,7 +28,7 @@ import java.util.List;
  * Therefore, NUMBER requires special treatment.
  * The user has to specify precision and scale when importing into avro.
  */
-public class OracleImportJobTestConfigurationForNumber implements ImportJobTestConfiguration {
+public class OracleImportJobTestConfigurationForNumber implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration {
 
 
   @Override
@@ -51,7 +49,7 @@ public class OracleImportJobTestConfigurationForNumber implements ImportJobTestC
   }
 
   @Override
-  public String[] getExpectedResults() {
+  public String[] getExpectedResultsForAvro() {
     String expectedRecord = "{\"ID\": 1, \"N1\": 100.010, \"N2\": 100, \"N3\": 100.03000}";
     String[] expectedResult = new String[1];
     expectedResult[0] = expectedRecord;
@@ -59,6 +57,14 @@ public class OracleImportJobTestConfigurationForNumber implements ImportJobTestC
   }
 
   @Override
+  public String[] getExpectedResultsForParquet() {
+    String expectedRecord = "1,100.010,100,100.03000";
+    String[] expectedResult = new String[1];
+    expectedResult[0] = expectedRecord;
+    return expectedResult;
+  }
+
+  @Override
   public String toString() {
     return getClass().getSimpleName();
   }
diff --git a/src/test/org/apache/sqoop/importjob/configuration/ParquetTestConfiguration.java b/src/test/org/apache/sqoop/importjob/configuration/ParquetTestConfiguration.java
new file mode 100644 (file)
index 0000000..3c161d1
--- /dev/null
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.importjob.configuration;
+
+public interface ParquetTestConfiguration extends ImportJobTestConfiguration{
+
+  String[] getExpectedResultsForParquet();
+}
@@ -16,9 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.sqoop.importjob.avro.configuration;
-
-import org.apache.sqoop.importjob.ImportJobTestConfiguration;
+package org.apache.sqoop.importjob.configuration;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -28,7 +26,7 @@ import java.util.List;
  * for precision and scale for NUMERIC. Also, important, that the accompanying columns
  *  - NUMERIC(20) and NUMERIC(20, 5) don't get modified.
  */
-public class PostgresqlImportJobTestConfigurationForNumeric implements ImportJobTestConfiguration {
+public class PostgresqlImportJobTestConfigurationForNumeric implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration {
 
   @Override
   public String[] getTypes() {
@@ -50,7 +48,7 @@ public class PostgresqlImportJobTestConfigurationForNumeric implements ImportJob
   }
 
   @Override
-  public String[] getExpectedResults() {
+  public String[] getExpectedResultsForAvro() {
     String expectedRecord = "{\"ID\": 1, \"N1\": 100.010, \"N2\": 100, \"N3\": 100.01000}";
     String[] expectedResult = new String[1];
     expectedResult[0] = expectedRecord;
@@ -58,6 +56,14 @@ public class PostgresqlImportJobTestConfigurationForNumeric implements ImportJob
   }
 
   @Override
+  public String[] getExpectedResultsForParquet() {
+    String expectedRecord = "1,100.010,100,100.01000";
+    String[] expectedResult = new String[1];
+    expectedResult[0] = expectedRecord;
+    return expectedResult;
+  }
+
+  @Override
   public String toString() {
     return getClass().getSimpleName();
   }
  * limitations under the License.
  */
 
-package org.apache.sqoop.importjob.avro.configuration;
-
-import org.apache.sqoop.importjob.ImportJobTestConfiguration;
+package org.apache.sqoop.importjob.configuration;
 
 import java.util.ArrayList;
 import java.util.List;
 
-public class PostgresqlImportJobTestConfigurationPaddingShouldSucceed implements ImportJobTestConfiguration {
+public class PostgresqlImportJobTestConfigurationPaddingShouldSucceed implements ImportJobTestConfiguration, AvroTestConfiguration, ParquetTestConfiguration {
 
   @Override
   public String[] getTypes() {
@@ -47,7 +45,7 @@ public class PostgresqlImportJobTestConfigurationPaddingShouldSucceed implements
   }
 
   @Override
-  public String[] getExpectedResults() {
+  public String[] getExpectedResultsForAvro() {
     String expectedRecord = "{\"ID\": 1, \"N2\": 1000000, \"N3\": 1000000.05000, \"N4\": 1000000, \"N5\": 1000000.05000, " +
         "\"D1\": 100, \"D2\": 1000000, \"D3\": 1000000.05000, \"D4\": 1000000, \"D5\": 1000000.05000}";
     String[] expectedResult = new String[1];
@@ -56,6 +54,14 @@ public class PostgresqlImportJobTestConfigurationPaddingShouldSucceed implements
   }
 
   @Override
+  public String[] getExpectedResultsForParquet() {
+    String expectedRecord = "1,1000000,1000000.05000,1000000,1000000.05000,100,1000000,1000000.05000,1000000,1000000.05000";
+    String[] expectedResult = new String[1];
+    expectedResult[0] = expectedRecord;
+    return expectedResult;
+  }
+
+  @Override
   public String toString() {
     return getClass().getSimpleName();
   }
index 908ce56..727be58 100644 (file)
@@ -18,6 +18,8 @@
 
 package org.apache.sqoop.util;
 
+import org.apache.avro.Conversions;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -29,7 +31,9 @@ import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.hadoop.util.HiddenFileFilter;
+import org.apache.parquet.schema.MessageType;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -65,7 +69,7 @@ public class ParquetReader implements AutoCloseable {
     this(pathToRead, new Configuration());
   }
 
-  public GenericRecord next() throws IOException {
+  private GenericRecord next() throws IOException {
     GenericRecord result = reader.read();
     if (result != null) {
       return result;
@@ -113,29 +117,38 @@ public class ParquetReader implements AutoCloseable {
   }
 
   public CompressionCodecName getCodec() {
-    List<Footer> footers = getFooters();
+    ParquetMetadata parquetMetadata = getParquetMetadata();
 
-    Iterator<Footer> footersIterator = footers.iterator();
-    if (footersIterator.hasNext()) {
-      Footer footer = footersIterator.next();
+    Iterator<BlockMetaData> blockMetaDataIterator = parquetMetadata.getBlocks().iterator();
+    if (blockMetaDataIterator.hasNext()) {
+      BlockMetaData blockMetaData = blockMetaDataIterator.next();
 
-      Iterator<BlockMetaData> blockMetaDataIterator = footer.getParquetMetadata().getBlocks().iterator();
-      if (blockMetaDataIterator.hasNext()) {
-        BlockMetaData blockMetaData = blockMetaDataIterator.next();
+      Iterator<ColumnChunkMetaData> columnChunkMetaDataIterator = blockMetaData.getColumns().iterator();
 
-        Iterator<ColumnChunkMetaData> columnChunkMetaDataIterator = blockMetaData.getColumns().iterator();
+      if (columnChunkMetaDataIterator.hasNext()) {
+        ColumnChunkMetaData columnChunkMetaData = columnChunkMetaDataIterator.next();
 
-        if (columnChunkMetaDataIterator.hasNext()) {
-          ColumnChunkMetaData columnChunkMetaData = columnChunkMetaDataIterator.next();
-
-          return columnChunkMetaData.getCodec();
-        }
+        return columnChunkMetaData.getCodec();
       }
     }
 
     return null;
   }
 
+  public MessageType readParquetSchema() {
+    try {
+      ParquetMetadata parquetMetadata = getParquetMetadata();
+
+      return parquetMetadata.getFileMetaData().getSchema();
+    } finally {
+      close();
+    }
+  }
+
+  private ParquetMetadata getParquetMetadata() {
+    return getFooters().stream().findFirst().get().getParquetMetadata();
+  }
+
   private List<Footer> getFooters() {
     final List<Footer> footers;
     try {
@@ -163,7 +176,8 @@ public class ParquetReader implements AutoCloseable {
       if (reader != null) {
         reader.close();
       }
-      this.reader = AvroParquetReader.<GenericRecord>builder(file).build();
+      GenericData.get().addLogicalTypeConversion(new Conversions.DecimalConversion());
+      this.reader = AvroParquetReader.<GenericRecord>builder(file).withDataModel(GenericData.get()).build();
     } catch (IOException e) {
       throw new RuntimeException(e);
     }