Use the FileIO submodule in Spark writers and readers. (#52)
authormccheah <mcheah@palantir.com>
Wed, 9 Jan 2019 00:55:54 +0000 (16:55 -0800)
committerRyan Blue <rdblue@users.noreply.github.com>
Wed, 9 Jan 2019 00:55:54 +0000 (16:55 -0800)
14 files changed:
api/src/main/java/com/netflix/iceberg/Table.java
api/src/main/java/com/netflix/iceberg/io/FileIO.java [moved from core/src/main/java/com/netflix/iceberg/FileIO.java with 98% similarity]
core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java
core/src/main/java/com/netflix/iceberg/BaseTable.java
core/src/main/java/com/netflix/iceberg/BaseTransaction.java
core/src/main/java/com/netflix/iceberg/TableOperations.java
core/src/main/java/com/netflix/iceberg/hadoop/HadoopFileIO.java
core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java
core/src/test/java/com/netflix/iceberg/LocalTableOperations.java
core/src/test/java/com/netflix/iceberg/TestTables.java
spark/src/main/java/com/netflix/iceberg/spark/source/IcebergSource.java
spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java
spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java
spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java

index fe19fa2..9acb50d 100644 (file)
@@ -19,6 +19,7 @@
 
 package com.netflix.iceberg;
 
+import com.netflix.iceberg.io.FileIO;
 import java.util.Map;
 
 /**
@@ -171,4 +172,10 @@ public interface Table {
    * @return a new {@link Transaction}
    */
   Transaction newTransaction();
+
+  /**
+   * @return a {@link FileIO} to read and write table data and metadata files
+   */
+  FileIO io();
+
 }
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package com.netflix.iceberg;
+package com.netflix.iceberg.io;
 
 import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.io.OutputFile;
index b107d0b..3e9b420 100644 (file)
@@ -22,6 +22,7 @@ package com.netflix.iceberg;
 import com.google.common.base.Objects;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.hadoop.HadoopFileIO;
+import com.netflix.iceberg.io.FileIO;
 import com.netflix.iceberg.io.OutputFile;
 import com.netflix.iceberg.util.Tasks;
 import org.apache.hadoop.conf.Configuration;
index da11b55..7d48ef2 100644 (file)
@@ -19,6 +19,7 @@
 
 package com.netflix.iceberg;
 
+import com.netflix.iceberg.io.FileIO;
 import java.util.Map;
 
 /**
@@ -136,6 +137,11 @@ public class BaseTable implements Table, HasTableOperations {
   }
 
   @Override
+  public FileIO io() {
+    return operations().io();
+  }
+
+  @Override
   public String toString() {
     return name;
   }
index 1a56b7e..b7c3a32 100644 (file)
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.netflix.iceberg.exceptions.CommitFailedException;
+import com.netflix.iceberg.io.FileIO;
 import com.netflix.iceberg.util.Tasks;
 import java.util.List;
 import java.util.Map;
@@ -365,5 +366,10 @@ class BaseTransaction implements Transaction {
     public Transaction newTransaction() {
       throw new UnsupportedOperationException("Cannot create a transaction within a transaction");
     }
+
+    @Override
+    public FileIO io() {
+      return transactionOps.io();
+    }
   }
 }
index 19fc386..974d5e2 100644 (file)
 
 package com.netflix.iceberg;
 
+import com.netflix.iceberg.io.FileIO;
 import java.util.UUID;
 
-import com.netflix.iceberg.io.OutputFile;
-
 /**
  * SPI interface to abstract table metadata access and updates.
  */
@@ -57,7 +56,7 @@ public interface TableOperations {
   void commit(TableMetadata base, TableMetadata metadata);
 
   /**
-   * @return a {@link com.netflix.iceberg.FileIO} to read and write table data and metadata files
+   * @return a {@link FileIO} to read and write table data and metadata files
    */
   FileIO io();
 
index 586942c..7e1d004 100644 (file)
@@ -1,6 +1,6 @@
 package com.netflix.iceberg.hadoop;
 
-import com.netflix.iceberg.FileIO;
+import com.netflix.iceberg.io.FileIO;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.io.OutputFile;
@@ -9,8 +9,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 
 public class HadoopFileIO implements FileIO {
 
index 4aa19f4..1a3b0fd 100644 (file)
@@ -19,7 +19,7 @@
 
 package com.netflix.iceberg.hadoop;
 
-import com.netflix.iceberg.FileIO;
+import com.netflix.iceberg.io.FileIO;
 import com.netflix.iceberg.TableMetadata;
 import com.netflix.iceberg.TableMetadataParser;
 import com.netflix.iceberg.TableOperations;
index 1508ee8..baa286f 100644 (file)
@@ -21,6 +21,7 @@ package com.netflix.iceberg;
 
 import com.google.common.collect.Maps;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
+import com.netflix.iceberg.io.FileIO;
 import java.util.Map;
 import org.junit.rules.TemporaryFolder;
 
index f1dbe4a..fbb58d4 100644 (file)
@@ -24,6 +24,7 @@ import com.google.common.collect.Maps;
 import com.netflix.iceberg.exceptions.AlreadyExistsException;
 import com.netflix.iceberg.exceptions.CommitFailedException;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
+import com.netflix.iceberg.io.FileIO;
 import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.io.OutputFile;
 import java.io.File;
index cd1a0af..1991d29 100644 (file)
@@ -59,8 +59,7 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
   public DataSourceReader createReader(DataSourceOptions options) {
     Configuration conf = new Configuration(lazyBaseConf());
     Table table = getTableAndResolveHadoopConfiguration(options, conf);
-
-    return new Reader(table, conf);
+    return new Reader(table);
   }
 
   @Override
@@ -92,7 +91,7 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
           .toUpperCase(Locale.ENGLISH));
     }
 
-    return Optional.of(new Writer(table, conf, format));
+    return Optional.of(new Writer(table, format));
   }
 
   protected Table findTable(DataSourceOptions options, Configuration conf) {
index 4a008ee..33b95c1 100644 (file)
@@ -22,6 +22,7 @@ package com.netflix.iceberg.spark.source;
 import com.google.common.collect.Lists;
 import com.netflix.iceberg.CombinedScanTask;
 import com.netflix.iceberg.DataFile;
+import com.netflix.iceberg.io.FileIO;
 import com.netflix.iceberg.FileScanTask;
 import com.netflix.iceberg.PartitionField;
 import com.netflix.iceberg.PartitionSpec;
@@ -34,7 +35,6 @@ import com.netflix.iceberg.avro.Avro;
 import com.netflix.iceberg.common.DynMethods;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.expressions.Expression;
-import com.netflix.iceberg.hadoop.HadoopInputFile;
 import com.netflix.iceberg.io.CloseableIterable;
 import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.parquet.Parquet;
@@ -44,7 +44,6 @@ import com.netflix.iceberg.spark.data.SparkAvroReader;
 import com.netflix.iceberg.spark.data.SparkParquetReaders;
 import com.netflix.iceberg.types.TypeUtil;
 import com.netflix.iceberg.types.Types;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.Attribute;
 import org.apache.spark.sql.catalyst.expressions.AttributeReference;
@@ -67,7 +66,6 @@ import org.apache.spark.sql.types.StringType;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
-import org.apache.spark.util.SerializableConfiguration;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.Serializable;
@@ -89,7 +87,7 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
   private static final Filter[] NO_FILTERS = new Filter[0];
 
   private final Table table;
-  private final SerializableConfiguration conf;
+  private final FileIO fileIo;
   private StructType requestedSchema = null;
   private List<Expression> filterExpressions = null;
   private Filter[] pushedFilters = NO_FILTERS;
@@ -99,10 +97,10 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
   private StructType type = null; // cached because Spark accesses it multiple times
   private List<CombinedScanTask> tasks = null; // lazy cache of tasks
 
-  Reader(Table table, Configuration conf) {
+  Reader(Table table) {
     this.table = table;
-    this.conf = new SerializableConfiguration(conf);
     this.schema = table.schema();
+    this.fileIo = table.io();
   }
 
   private Schema lazySchema() {
@@ -135,7 +133,7 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
 
     List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
     for (CombinedScanTask task : tasks()) {
-      readTasks.add(new ReadTask(task, tableSchemaString, expectedSchemaString, conf));
+      readTasks.add(new ReadTask(task, tableSchemaString, expectedSchemaString, fileIo));
     }
 
     return readTasks;
@@ -228,22 +226,22 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
     private final CombinedScanTask task;
     private final String tableSchemaString;
     private final String expectedSchemaString;
-    private final SerializableConfiguration conf;
+    private final FileIO fileIo;
 
     private transient Schema tableSchema = null;
     private transient Schema expectedSchema = null;
 
-    private ReadTask(CombinedScanTask task, String tableSchemaString, String expectedSchemaString,
-                     SerializableConfiguration conf) {
+    private ReadTask(
+        CombinedScanTask task, String tableSchemaString, String expectedSchemaString, FileIO fileIo) {
       this.task = task;
       this.tableSchemaString = tableSchemaString;
       this.expectedSchemaString = expectedSchemaString;
-      this.conf = conf;
+      this.fileIo = fileIo;
     }
 
     @Override
     public InputPartitionReader<InternalRow> createPartitionReader() {
-      return new TaskDataReader(task, lazyTableSchema(), lazyExpectedSchema(), conf.value());
+      return new TaskDataReader(task, lazyTableSchema(), lazyExpectedSchema(), fileIo);
     }
 
     private Schema lazyTableSchema() {
@@ -270,18 +268,18 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
     private final Iterator<FileScanTask> tasks;
     private final Schema tableSchema;
     private final Schema expectedSchema;
-    private final Configuration conf;
+    private final FileIO fileIo;
 
     private Iterator<InternalRow> currentIterator = null;
     private Closeable currentCloseable = null;
     private InternalRow current = null;
 
-    public TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, Configuration conf) {
+    public TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, FileIO fileIo) {
+      this.fileIo = fileIo;
       this.tasks = task.files().iterator();
       this.tableSchema = tableSchema;
       this.expectedSchema = expectedSchema;
-      this.conf = conf;
-      // open last because the schemas and conf must be set
+      // open last because the schemas and fileIo must be set
       this.currentIterator = open(tasks.next());
     }
 
@@ -346,17 +344,17 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
 
         // create joined rows and project from the joined schema to the final schema
         iterSchema = TypeUtil.join(readSchema, partitionSchema);
-        iter = transform(open(task, readSchema, conf), joined::withLeft);
+        iter = transform(open(task, readSchema), joined::withLeft);
 
       } else if (hasExtraFilterColumns) {
         // add projection to the final schema
         iterSchema = requiredSchema;
-        iter = open(task, requiredSchema, conf);
+        iter = open(task, requiredSchema);
 
       } else {
         // return the base iterator
         iterSchema = finalSchema;
-        iter = open(task, finalSchema, conf);
+        iter = open(task, finalSchema);
       }
 
       // TODO: remove the projection by reporting the iterator's schema back to Spark
@@ -386,9 +384,8 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
           asScalaBufferConverter(attrs).asScala().toSeq());
     }
 
-    private Iterator<InternalRow> open(FileScanTask task, Schema readSchema,
-                                       Configuration conf) {
-      InputFile location = HadoopInputFile.fromLocation(task.file().path(), conf);
+    private Iterator<InternalRow> open(FileScanTask task, Schema readSchema) {
+      InputFile location = fileIo.newInputFile(task.file().path().toString());
       CloseableIterable<InternalRow> iter;
       switch (task.file().format()) {
         case PARQUET:
index c9d3a7b..902ba80 100644 (file)
@@ -28,6 +28,7 @@ import com.netflix.iceberg.AppendFiles;
 import com.netflix.iceberg.DataFile;
 import com.netflix.iceberg.DataFiles;
 import com.netflix.iceberg.FileFormat;
+import com.netflix.iceberg.io.FileIO;
 import com.netflix.iceberg.Metrics;
 import com.netflix.iceberg.PartitionSpec;
 import com.netflix.iceberg.Schema;
@@ -35,8 +36,6 @@ import com.netflix.iceberg.Table;
 import com.netflix.iceberg.TableProperties;
 import com.netflix.iceberg.avro.Avro;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
-import com.netflix.iceberg.hadoop.HadoopInputFile;
-import com.netflix.iceberg.hadoop.HadoopOutputFile;
 import com.netflix.iceberg.io.FileAppender;
 import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.io.OutputFile;
@@ -46,8 +45,6 @@ import com.netflix.iceberg.transforms.Transform;
 import com.netflix.iceberg.transforms.Transforms;
 import com.netflix.iceberg.types.Types.StringType;
 import com.netflix.iceberg.util.Tasks;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
@@ -55,7 +52,6 @@ import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
 import org.apache.spark.sql.sources.v2.writer.DataWriter;
 import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
 import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
-import org.apache.spark.util.SerializableConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.io.Closeable;
@@ -89,18 +85,18 @@ class Writer implements DataSourceWriter {
   private static final Logger LOG = LoggerFactory.getLogger(Writer.class);
 
   private final Table table;
-  private final Configuration conf;
   private final FileFormat format;
+  private final FileIO fileIo;
 
-  Writer(Table table, Configuration conf, FileFormat format) {
+  Writer(Table table, FileFormat format) {
     this.table = table;
-    this.conf = conf;
     this.format = format;
+    this.fileIo = table.io();
   }
 
   @Override
   public DataWriterFactory<InternalRow> createWriterFactory() {
-    return new WriterFactory(table.spec(), format, dataLocation(), table.properties(), conf);
+    return new WriterFactory(table.spec(), format, dataLocation(), table.properties(), fileIo);
   }
 
   @Override
@@ -122,13 +118,6 @@ class Writer implements DataSourceWriter {
 
   @Override
   public void abort(WriterCommitMessage[] messages) {
-    FileSystem fs;
-    try {
-      fs = new Path(table.location()).getFileSystem(conf);
-    } catch (IOException e) {
-      throw new RuntimeIOException(e);
-    }
-
     Tasks.foreach(files(messages))
         .retry(propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
         .exponentialBackoff(
@@ -138,11 +127,7 @@ class Writer implements DataSourceWriter {
             2.0 /* exponential */ )
         .throwFailureWhenFinished()
         .run(file -> {
-          try {
-            fs.delete(new Path(file.path().toString()), false /* not recursive */ );
-          } catch (IOException e) {
-            throw new RuntimeIOException(e);
-          }
+          fileIo.deleteFile(file.path().toString());
         });
   }
 
@@ -165,9 +150,10 @@ class Writer implements DataSourceWriter {
   }
 
   private String dataLocation() {
-    return table.properties().getOrDefault(
-        TableProperties.WRITE_NEW_DATA_LOCATION,
-        new Path(new Path(table.location()), "data").toString());
+    return stripTrailingSlash(
+        table.properties().getOrDefault(
+            TableProperties.WRITE_NEW_DATA_LOCATION,
+            String.format("%s/data", table.location())));
   }
 
   @Override
@@ -202,18 +188,16 @@ class Writer implements DataSourceWriter {
     private final FileFormat format;
     private final String dataLocation;
     private final Map<String, String> properties;
-    private final SerializableConfiguration conf;
     private final String uuid = UUID.randomUUID().toString();
-
-    private transient Path dataPath = null;
+    private final FileIO fileIo;
 
     WriterFactory(PartitionSpec spec, FileFormat format, String dataLocation,
-                  Map<String, String> properties, Configuration conf) {
+                  Map<String, String> properties, FileIO fileIo) {
       this.spec = spec;
       this.format = format;
       this.dataLocation = dataLocation;
       this.properties = properties;
-      this.conf = new SerializableConfiguration(conf);
+      this.fileIo = fileIo;
     }
 
     @Override
@@ -221,12 +205,10 @@ class Writer implements DataSourceWriter {
       String filename = format.addExtension(String.format("%05d-%d-%s", partitionId, taskId, uuid));
       AppenderFactory<InternalRow> factory = new SparkAppenderFactory();
       if (spec.fields().isEmpty()) {
-        return new UnpartitionedWriter(lazyDataPath(), filename, format, conf.value(), factory);
-
+        return new UnpartitionedWriter(dataLocation, filename, format, factory, fileIo);
       } else {
-        Path baseDataPath = lazyDataPath(); // avoid calling this in the output path function
-        Function<PartitionKey, Path> outputPathFunc = key ->
-            new Path(new Path(baseDataPath, key.toPath()), filename);
+        Function<PartitionKey, String> outputPathFunc = key ->
+            String.format("%s/%s/%s", dataLocation, key.toPath(), filename);
 
         boolean useObjectStorage = (
             Boolean.parseBoolean(properties.get(OBJECT_STORE_ENABLED)) ||
@@ -235,43 +217,46 @@ class Writer implements DataSourceWriter {
 
         if (useObjectStorage) {
           // try to get db and table portions of the path for context in the object store
-          String context = pathContext(baseDataPath);
-          String objectStore = properties.get(OBJECT_STORE_PATH);
+          String context = pathContext(new Path(dataLocation));
+          String objectStore = stripTrailingSlash(properties.get(OBJECT_STORE_PATH));
           Preconditions.checkNotNull(objectStore,
               "Cannot use object storage, missing location: " + OBJECT_STORE_PATH);
-          Path objectStorePath = new Path(objectStore);
 
           outputPathFunc = key -> {
-            String partitionAndFilename = key.toPath() + "/" + filename;
+            String partitionAndFilename = String.format("%s/%s", key.toPath(), filename);
             int hash = HASH_FUNC.apply(partitionAndFilename);
-            return new Path(objectStorePath,
-                String.format("%08x/%s/%s", hash, context, partitionAndFilename));
+            return String.format(
+                "%s/%08x/%s/%s/%s",
+                objectStore,
+                hash,
+                context,
+                key.toPath(),
+                filename);
           };
         }
 
-        return new PartitionedWriter(spec, format, conf.value(), factory, outputPathFunc);
+        return new PartitionedWriter(spec, format, factory, outputPathFunc, fileIo);
       }
     }
 
     private static String pathContext(Path dataPath) {
       Path parent = dataPath.getParent();
+      String resolvedContext;
       if (parent != null) {
         // remove the data folder
         if (dataPath.getName().equals("data")) {
-          return pathContext(parent);
+          resolvedContext = pathContext(parent);
+        } else {
+          resolvedContext = String.format("%s/%s", parent.getName(), dataPath.getName());
         }
-
-        return parent.getName() + "/" + dataPath.getName();
+      } else {
+        resolvedContext = dataPath.getName();
       }
 
-      return dataPath.getName();
-    }
-
-    private Path lazyDataPath() {
-      if (dataPath == null) {
-        this.dataPath = new Path(dataLocation);
-      }
-      return dataPath;
+      Preconditions.checkState(
+          !resolvedContext.endsWith("/"),
+          "Path context must not end with a slash.");
+      return resolvedContext;
     }
 
     private class SparkAppenderFactory implements AppenderFactory<InternalRow> {
@@ -314,16 +299,20 @@ class Writer implements DataSourceWriter {
   }
 
   private static class UnpartitionedWriter implements DataWriter<InternalRow>, Closeable {
-    private final Path file;
-    private final Configuration conf;
+    private final FileIO fileIo;
+    private final String file;
     private FileAppender<InternalRow> appender = null;
     private Metrics metrics = null;
 
-    UnpartitionedWriter(Path dataPath, String filename, FileFormat format,
-                        Configuration conf, AppenderFactory<InternalRow> factory) {
-      this.file = new Path(dataPath, filename);
-      this.appender = factory.newAppender(HadoopOutputFile.fromPath(file, conf), format);
-      this.conf = conf;
+    UnpartitionedWriter(
+        String dataPath,
+        String filename,
+        FileFormat format,
+        AppenderFactory<InternalRow> factory,
+        FileIO fileIo) {
+      this.file = String.format("%s/%s", dataPath, filename);
+      this.fileIo = fileIo;
+      this.appender = factory.newAppender(fileIo.newOutputFile(file), format);
     }
 
     @Override
@@ -338,12 +327,11 @@ class Writer implements DataSourceWriter {
       close();
 
       if (metrics.recordCount() == 0L) {
-        FileSystem fs = file.getFileSystem(conf);
-        fs.delete(file, false);
+        fileIo.deleteFile(file);
         return new TaskCommit();
       }
 
-      InputFile inFile = HadoopInputFile.fromPath(file, conf);
+      InputFile inFile = fileIo.newInputFile(file);
       DataFile dataFile = DataFiles.fromInputFile(inFile, null, metrics);
 
       return new TaskCommit(dataFile);
@@ -354,9 +342,7 @@ class Writer implements DataSourceWriter {
       Preconditions.checkArgument(appender != null, "Abort called on a closed writer: %s", this);
 
       close();
-
-      FileSystem fs = file.getFileSystem(conf);
-      fs.delete(file, false);
+      fileIo.deleteFile(file);
     }
 
     @Override
@@ -374,24 +360,27 @@ class Writer implements DataSourceWriter {
     private final List<DataFile> completedFiles = Lists.newArrayList();
     private final PartitionSpec spec;
     private final FileFormat format;
-    private final Configuration conf;
     private final AppenderFactory<InternalRow> factory;
-    private final Function<PartitionKey, Path> outputPathFunc;
+    private final Function<PartitionKey, String> outputPathFunc;
     private final PartitionKey key;
+    private final FileIO fileIo;
 
     private PartitionKey currentKey = null;
     private FileAppender<InternalRow> currentAppender = null;
-    private Path currentPath = null;
-
-    PartitionedWriter(PartitionSpec spec, FileFormat format, Configuration conf,
-                      AppenderFactory<InternalRow> factory,
-                      Function<PartitionKey, Path> outputPathFunc) {
+    private String currentPath = null;
+
+    PartitionedWriter(
+        PartitionSpec spec,
+        FileFormat format,
+        AppenderFactory<InternalRow> factory,
+        Function<PartitionKey, String> outputPathFunc,
+        FileIO fileIo) {
       this.spec = spec;
       this.format = format;
-      this.conf = conf;
       this.factory = factory;
       this.outputPathFunc = outputPathFunc;
       this.key = new PartitionKey(spec);
+      this.fileIo = fileIo;
     }
 
     @Override
@@ -410,7 +399,7 @@ class Writer implements DataSourceWriter {
 
         this.currentKey = key.copy();
         this.currentPath = outputPathFunc.apply(currentKey);
-        OutputFile file = HadoopOutputFile.fromPath(currentPath, conf);
+        OutputFile file = fileIo.newOutputFile(currentPath.toString());
         this.currentAppender = factory.newAppender(file, format);
       }
 
@@ -425,18 +414,16 @@ class Writer implements DataSourceWriter {
 
     @Override
     public void abort() throws IOException {
-      FileSystem fs = currentPath.getFileSystem(conf);
-
       // clean up files created by this writer
       Tasks.foreach(completedFiles)
           .throwFailureWhenFinished()
           .noRetry()
-          .run(file -> fs.delete(new Path(file.path().toString())), IOException.class);
+          .run(file -> fileIo.deleteFile(file.path().toString()));
 
       if (currentAppender != null) {
         currentAppender.close();
         this.currentAppender = null;
-        fs.delete(currentPath);
+        fileIo.deleteFile(currentPath);
       }
     }
 
@@ -447,7 +434,7 @@ class Writer implements DataSourceWriter {
         Metrics metrics = currentAppender.metrics();
         this.currentAppender = null;
 
-        InputFile inFile = HadoopInputFile.fromPath(currentPath, conf);
+        InputFile inFile = fileIo.newInputFile(currentPath);
         DataFile dataFile = DataFiles.builder(spec)
             .withInputFile(inFile)
             .withPartition(currentKey)
@@ -459,4 +446,12 @@ class Writer implements DataSourceWriter {
       }
     }
   }
+
+  private static String stripTrailingSlash(String path) {
+    String result = path;
+    while (result.endsWith("/")) {
+      result = result.substring(0, path.length() - 1);
+    }
+    return result;
+  }
 }
index 90b6dc8..c18636f 100644 (file)
@@ -21,7 +21,7 @@ package com.netflix.iceberg.spark.source;
 
 import com.google.common.collect.Maps;
 import com.netflix.iceberg.BaseTable;
-import com.netflix.iceberg.FileIO;
+import com.netflix.iceberg.io.FileIO;
 import com.netflix.iceberg.Files;
 import com.netflix.iceberg.PartitionSpec;
 import com.netflix.iceberg.Schema;
@@ -34,7 +34,6 @@ import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.io.OutputFile;
 import java.io.File;
-import java.io.IOException;
 import java.util.Map;
 
 // TODO: Use the copy of this from core.
@@ -73,7 +72,8 @@ class TestTables {
       this.ops = ops;
     }
 
-    TestTableOperations ops() {
+    @Override
+    public TestTableOperations operations() {
       return ops;
     }
   }