Allow custom hadoop properties to be loaded in the Spark data source (#7)
authormccheah <mcheah@palantir.com>
Fri, 14 Dec 2018 18:04:17 +0000 (10:04 -0800)
committerRyan Blue <rdblue@users.noreply.github.com>
Fri, 14 Dec 2018 18:04:17 +0000 (10:04 -0800)
Properties that start with iceberg.hadoop are copied into the Hadoop Configuration used in the Spark source. These may be set in table properties or in read and write options passed to the Spark operation. Read and write options take precedence over the table properties.

Supporting these custom Hadoop properties should also be done in other Iceberg integrations in subsequent patches.

spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java
spark/src/test/java/com/netflix/iceberg/spark/source/TestIcebergSource.java

index 7daa330..cd1a0af 100644 (file)
@@ -39,6 +39,7 @@ import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
 import org.apache.spark.sql.types.StructType;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Optional;
 
 import static com.netflix.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
@@ -56,16 +57,18 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
 
   @Override
   public DataSourceReader createReader(DataSourceOptions options) {
-    Table table = findTable(options);
-    return new Reader(table, lazyConf());
+    Configuration conf = new Configuration(lazyBaseConf());
+    Table table = getTableAndResolveHadoopConfiguration(options, conf);
+
+    return new Reader(table, conf);
   }
 
   @Override
   public Optional<DataSourceWriter> createWriter(String jobId, StructType dfStruct, SaveMode mode,
                                                    DataSourceOptions options) {
     Preconditions.checkArgument(mode == SaveMode.Append, "Save mode %s is not supported", mode);
-
-    Table table = findTable(options);
+    Configuration conf = new Configuration(lazyBaseConf());
+    Table table = getTableAndResolveHadoopConfiguration(options, conf);
 
     Schema dfSchema = SparkSchemaUtil.convert(table.schema(), dfStruct);
     List<String> errors = CheckCompatibility.writeCompatibilityErrors(table.schema(), dfSchema);
@@ -89,30 +92,49 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
           .toUpperCase(Locale.ENGLISH));
     }
 
-    return Optional.of(new Writer(table, lazyConf(), format));
+    return Optional.of(new Writer(table, conf, format));
   }
 
-  protected Table findTable(DataSourceOptions options) {
+  protected Table findTable(DataSourceOptions options, Configuration conf) {
     Optional<String> location = options.get("path");
     Preconditions.checkArgument(location.isPresent(),
         "Cannot open table without a location: path is not set");
 
-    HadoopTables tables = new HadoopTables(lazyConf());
+    HadoopTables tables = new HadoopTables(conf);
 
     return tables.load(location.get());
   }
 
-  protected SparkSession lazySparkSession() {
+  private SparkSession lazySparkSession() {
     if (lazySpark == null) {
       this.lazySpark = SparkSession.builder().getOrCreate();
     }
     return lazySpark;
   }
 
-  protected Configuration lazyConf() {
+  private Configuration lazyBaseConf() {
     if (lazyConf == null) {
       this.lazyConf = lazySparkSession().sparkContext().hadoopConfiguration();
     }
     return lazyConf;
   }
+
+  private Table getTableAndResolveHadoopConfiguration(
+      DataSourceOptions options, Configuration conf) {
+    // Overwrite configurations from the Spark Context with configurations from the options.
+    mergeIcebergHadoopConfs(conf, options.asMap());
+    Table table = findTable(options, conf);
+    // Set confs from table properties
+    mergeIcebergHadoopConfs(conf, table.properties());
+    // Re-overwrite values set in options and table properties but were not in the environment.
+    mergeIcebergHadoopConfs(conf, options.asMap());
+    return table;
+  }
+
+  private static void mergeIcebergHadoopConfs(
+      Configuration baseConf, Map<String, String> options) {
+    options.keySet().stream()
+        .filter(key -> key.startsWith("iceberg.hadoop"))
+        .forEach(key -> baseConf.set(key.replaceFirst("iceberg.hadoop", ""), options.get(key)));
+  }
 }
index a544162..357671b 100644 (file)
@@ -20,6 +20,7 @@
 package com.netflix.iceberg.spark.source;
 
 import com.netflix.iceberg.Table;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 
 public class TestIcebergSource extends IcebergSource {
@@ -29,7 +30,7 @@ public class TestIcebergSource extends IcebergSource {
   }
 
   @Override
-  protected Table findTable(DataSourceOptions options) {
+  protected Table findTable(DataSourceOptions options, Configuration conf) {
     return TestTables.load(options.get("iceberg.table.name").get());
   }
 }