Spark: Support custom data location (#6)
authormccheah <mcheah@palantir.com>
Tue, 11 Dec 2018 20:36:59 +0000 (12:36 -0800)
committerRyan Blue <rdblue@users.noreply.github.com>
Tue, 11 Dec 2018 20:36:59 +0000 (12:36 -0800)
This adds a new table property, write.folder-storage.path, that controls the location of new data files.

core/src/main/java/com/netflix/iceberg/TableProperties.java
spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java
spark/src/test/java/com/netflix/iceberg/spark/data/AvroDataTest.java
spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java
spark/src/test/java/com/netflix/iceberg/spark/source/TestParquetWrite.java

index e522f84..0d99c7e 100644 (file)
@@ -67,6 +67,11 @@ public class TableProperties {
 
   public static final String OBJECT_STORE_PATH = "write.object-storage.path";
 
+  // This only applies to files written after this property is set. Files previously written aren't relocated to
+  // reflect this parameter.
+  // If not set, defaults to a "data" folder underneath the root path of the table.
+  public static final String WRITE_NEW_DATA_LOCATION = "write.folder-storage.path";
+
   public static final String MANIFEST_LISTS_ENABLED = "write.manifest-lists.enabled";
   public static final boolean MANIFEST_LISTS_ENABLED_DEFAULT = false;
 }
index bed2cf6..c9d3a7b 100644 (file)
@@ -32,6 +32,7 @@ import com.netflix.iceberg.Metrics;
 import com.netflix.iceberg.PartitionSpec;
 import com.netflix.iceberg.Schema;
 import com.netflix.iceberg.Table;
+import com.netflix.iceberg.TableProperties;
 import com.netflix.iceberg.avro.Avro;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.hadoop.HadoopInputFile;
@@ -164,7 +165,9 @@ class Writer implements DataSourceWriter {
   }
 
   private String dataLocation() {
-    return new Path(new Path(table.location()), "data").toString();
+    return table.properties().getOrDefault(
+        TableProperties.WRITE_NEW_DATA_LOCATION,
+        new Path(new Path(table.location()), "data").toString());
   }
 
   @Override
index f84c6fe..bc74908 100644 (file)
@@ -38,7 +38,7 @@ public abstract class AvroDataTest {
 
   protected abstract void writeAndValidate(Schema schema) throws IOException;
 
-  private static final StructType SUPPORTED_PRIMITIVES = StructType.of(
+  protected static final StructType SUPPORTED_PRIMITIVES = StructType.of(
       required(100, "id", LongType.get()),
       optional(101, "data", Types.StringType.get()),
       required(102, "b", Types.BooleanType.get()),
index 3b0d32b..05f8f80 100644 (file)
@@ -32,10 +32,12 @@ import com.netflix.iceberg.io.FileAppender;
 import com.netflix.iceberg.spark.data.AvroDataTest;
 import com.netflix.iceberg.spark.data.RandomData;
 import com.netflix.iceberg.spark.data.SparkAvroReader;
+import com.netflix.iceberg.types.Types;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrameWriter;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
@@ -43,10 +45,12 @@ import org.apache.spark.sql.catalyst.InternalRow;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
 import java.util.List;
 
 import static com.netflix.iceberg.spark.SparkSchemaUtil.convert;
@@ -57,7 +61,7 @@ import static com.netflix.iceberg.spark.data.TestHelpers.assertEqualsUnsafe;
 public class TestDataFrameWrites extends AvroDataTest {
   private static final Configuration CONF = new Configuration();
 
-  private String format = null;
+  private final String format;
 
   @Parameterized.Parameters
   public static Object[][] parameters() {
@@ -90,23 +94,43 @@ public class TestDataFrameWrites extends AvroDataTest {
 
   @Override
   protected void writeAndValidate(Schema schema) throws IOException {
+    File location = createTableFolder();
+    Table table = createTable(schema, location);
+    writeAndValidateWithLocations(table, location, new File(location, "data"));
+  }
+
+  @Test
+  public void testWriteWithCustomDataLocation() throws IOException {
+    File location = createTableFolder();
+    File tablePropertyDataLocation = temp.newFolder("test-table-property-data-dir");
+    Table table = createTable(new Schema(SUPPORTED_PRIMITIVES.fields()), location);
+    table.updateProperties().set(
+        TableProperties.WRITE_NEW_DATA_LOCATION, tablePropertyDataLocation.getAbsolutePath()).commit();
+    writeAndValidateWithLocations(table, location, tablePropertyDataLocation);
+  }
+
+  private File createTableFolder() throws IOException {
     File parent = temp.newFolder("parquet");
     File location = new File(parent, "test");
     Assert.assertTrue("Mkdir should succeed", location.mkdirs());
+    return location;
+  }
 
+  private Table createTable(Schema schema, File location) {
     HadoopTables tables = new HadoopTables(CONF);
-    Table table = tables.create(schema, PartitionSpec.unpartitioned(), location.toString());
+    return tables.create(schema, PartitionSpec.unpartitioned(), location.toString());
+  }
+
+  private void writeAndValidateWithLocations(Table table, File location, File expectedDataDir) throws IOException {
     Schema tableSchema = table.schema(); // use the table schema because ids are reassigned
 
     table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit();
 
     List<Record> expected = RandomData.generateList(tableSchema, 100, 0L);
     Dataset<Row> df = createDataset(expected, tableSchema);
+    DataFrameWriter<?> writer = df.write().format("iceberg").mode("append");
 
-    df.write()
-        .format("iceberg")
-        .mode("append")
-        .save(location.toString());
+    writer.save(location.toString());
 
     table.refresh();
 
@@ -120,6 +144,14 @@ public class TestDataFrameWrites extends AvroDataTest {
     for (int i = 0; i < expected.size(); i += 1) {
       assertEqualsSafe(tableSchema.asStruct(), expected.get(i), actual.get(i));
     }
+
+    table.currentSnapshot().addedFiles().forEach(dataFile ->
+        Assert.assertTrue(
+            String.format(
+                "File should have the parent directory %s, but has: %s.",
+                expectedDataDir.getAbsolutePath(),
+                dataFile.path()),
+            URI.create(dataFile.path().toString()).getPath().startsWith(expectedDataDir.getAbsolutePath())));
   }
 
   private Dataset<Row> createDataset(List<Record> records, Schema schema) throws IOException {
index 4f71ead..a2d105d 100644 (file)
@@ -71,7 +71,6 @@ public class TestParquetWrite {
   public void testBasicWrite() throws IOException {
     File parent = temp.newFolder("parquet");
     File location = new File(parent, "test");
-    location.mkdirs();
 
     HadoopTables tables = new HadoopTables(CONF);
     PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();