Add pluggable file I/O submodule in TableOperations (#14)
authormccheah <mcheah@palantir.com>
Tue, 11 Dec 2018 17:14:45 +0000 (09:14 -0800)
committerRyan Blue <rdblue@users.noreply.github.com>
Tue, 11 Dec 2018 17:14:45 +0000 (09:14 -0800)
This adds FileIO that is returned by TableOperations and used to delete paths and to create InputFile and OutputFile instances. FileIO is Serializable so that it can be sent to tasks running in different JVMs and used for all file-related tasks for a table.

20 files changed:
api/src/main/java/com/netflix/iceberg/Files.java
core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java
core/src/main/java/com/netflix/iceberg/BaseSnapshot.java
core/src/main/java/com/netflix/iceberg/BaseTableScan.java
core/src/main/java/com/netflix/iceberg/BaseTransaction.java
core/src/main/java/com/netflix/iceberg/FileIO.java [new file with mode: 0644]
core/src/main/java/com/netflix/iceberg/ManifestGroup.java
core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java
core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java
core/src/main/java/com/netflix/iceberg/SnapshotParser.java
core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java
core/src/main/java/com/netflix/iceberg/TableOperations.java
core/src/main/java/com/netflix/iceberg/hadoop/HadoopFileIO.java [new file with mode: 0644]
core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java
core/src/main/java/com/netflix/iceberg/hadoop/SerializableConfiguration.java [new file with mode: 0644]
core/src/test/java/com/netflix/iceberg/LocalTableOperations.java
core/src/test/java/com/netflix/iceberg/TestTables.java
data/src/main/java/com/netflix/iceberg/data/TableScanIterable.java
hive/src/main/java/com/netflix/iceberg/hive/HiveTableOperations.java
spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java

index f739751..e85825a 100644 (file)
@@ -29,6 +29,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.file.Paths;
 
 public class Files {
 
@@ -37,7 +38,7 @@ public class Files {
   }
 
   public static OutputFile localOutput(String file) {
-    return localOutput(new File(file));
+    return localOutput(Paths.get(file).toAbsolutePath().toFile());
   }
 
   private static class LocalOutputFile implements OutputFile {
@@ -53,6 +54,13 @@ public class Files {
         throw new AlreadyExistsException("File already exists: %s", file);
       }
 
+      if (!file.getParentFile().isDirectory() && !file.getParentFile().mkdirs()) {
+        throw new RuntimeIOException(
+            String.format(
+                "Failed to create the file's directory at %s.",
+                file.getParentFile().getAbsolutePath()));
+      }
+
       try {
         return new PositionFileOutputStream(new RandomAccessFile(file, "rw"));
       } catch (FileNotFoundException e) {
index 81452d4..79a1337 100644 (file)
 package com.netflix.iceberg;
 
 import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
-import com.netflix.iceberg.hadoop.HadoopOutputFile;
-import com.netflix.iceberg.io.InputFile;
+import com.netflix.iceberg.hadoop.HadoopFileIO;
 import com.netflix.iceberg.io.OutputFile;
 import com.netflix.iceberg.util.Tasks;
 import org.apache.hadoop.conf.Configuration;
@@ -53,6 +51,7 @@ public abstract class BaseMetastoreTableOperations implements TableOperations {
   private static final String HIVE_LOCATION_FOLDER_NAME = "empty";
 
   private final Configuration conf;
+  private final FileIO fileIo;
 
   private TableMetadata currentMetadata = null;
   private String currentMetadataLocation = null;
@@ -62,6 +61,7 @@ public abstract class BaseMetastoreTableOperations implements TableOperations {
 
   protected BaseMetastoreTableOperations(Configuration conf) {
     this.conf = conf;
+    this.fileIo = new HadoopFileIO(conf);
   }
 
   @Override
@@ -88,22 +88,18 @@ public abstract class BaseMetastoreTableOperations implements TableOperations {
     return String.format("%s/%s", baseLocation, HIVE_LOCATION_FOLDER_NAME);
   }
 
-  public String dataLocation() {
-    return String.format("%s/%s", baseLocation, DATA_FOLDER_NAME);
-  }
-
   protected String writeNewMetadata(TableMetadata metadata, int version) {
     if (baseLocation == null) {
       baseLocation = metadata.location();
     }
 
-    String newFilename = newTableMetadataFilename(baseLocation, version);
-    OutputFile newMetadataLocation = HadoopOutputFile.fromPath(new Path(newFilename), conf);
+    String newTableMetadataFilePath = newTableMetadataFilePath(baseLocation, version);
+    OutputFile newMetadataLocation = fileIo.newOutputFile(newTableMetadataFilePath);
 
     // write the new metadata
     TableMetadataParser.write(metadata, newMetadataLocation);
 
-    return newFilename;
+    return newTableMetadataFilePath;
   }
 
   protected void refreshFromMetadataLocation(String newLocation) {
@@ -129,24 +125,13 @@ public abstract class BaseMetastoreTableOperations implements TableOperations {
   }
 
   @Override
-  public InputFile newInputFile(String path) {
-    return fromLocation(path, conf);
+  public String metadataFileLocation(String fileName) {
+    return String.format("%s/%s/%s", baseLocation, METADATA_FOLDER_NAME, fileName);
   }
 
   @Override
-  public OutputFile newMetadataFile(String filename) {
-    return HadoopOutputFile.fromPath(
-        new Path(newMetadataLocation(baseLocation, filename)), conf);
-  }
-
-  @Override
-  public void deleteFile(String file) {
-    Path path = new Path(file);
-    try {
-      getFS(path, conf).delete(path, false /* should be a file, not recursive */ );
-    } catch (IOException e) {
-      throw new RuntimeIOException(e);
-    }
+  public FileIO io() {
+    return fileIo;
   }
 
   @Override
@@ -154,7 +139,7 @@ public abstract class BaseMetastoreTableOperations implements TableOperations {
     return System.currentTimeMillis();
   }
 
-  private String newTableMetadataFilename(String baseLocation, int newVersion) {
+  private String newTableMetadataFilePath(String baseLocation, int newVersion) {
     return String.format("%s/%s/%05d-%s%s",
             baseLocation,
             METADATA_FOLDER_NAME,
@@ -163,22 +148,6 @@ public abstract class BaseMetastoreTableOperations implements TableOperations {
             getFileExtension(this.conf));
   }
 
-  private static String newMetadataLocation(String baseLocation, String filename) {
-    return String.format("%s/%s/%s", baseLocation, METADATA_FOLDER_NAME, filename);
-  }
-
-  private static String parseBaseLocation(String metadataLocation) {
-    int lastSlash = metadataLocation.lastIndexOf('/');
-    int secondToLastSlash = metadataLocation.lastIndexOf('/', lastSlash);
-
-    // verify that the metadata file was contained in a "metadata" folder
-    String parentFolderName = metadataLocation.substring(secondToLastSlash + 1, lastSlash);
-    Preconditions.checkArgument(METADATA_FOLDER_NAME.equals(parentFolderName),
-        "Invalid metadata location, not in metadata/ folder: %s", metadataLocation);
-
-    return metadataLocation.substring(0, secondToLastSlash);
-  }
-
   private static int parseVersion(String metadataLocation) {
     int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this will be 0
     int versionEnd = metadataLocation.indexOf('-', versionStart);
index 945ddbb..36a873a 100644 (file)
@@ -50,7 +50,7 @@ class BaseSnapshot implements Snapshot {
                String... manifestFiles) {
     this(ops, snapshotId, null, System.currentTimeMillis(),
         Lists.transform(Arrays.asList(manifestFiles),
-            path -> new GenericManifestFile(ops.newInputFile(path), 0)));
+            path -> new GenericManifestFile(ops.io().newInputFile(path), 0)));
   }
 
   BaseSnapshot(TableOperations ops,
@@ -139,7 +139,7 @@ class BaseSnapshot implements Snapshot {
     // accumulate adds and deletes from all manifests.
     // because manifests can be reused in newer snapshots, filter the changes by snapshot id.
     for (String manifest : Iterables.transform(manifests, ManifestFile::path)) {
-      try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest))) {
+      try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest))) {
         for (ManifestEntry add : reader.addedFiles()) {
           if (add.snapshotId() == snapshotId) {
             adds.add(add.file().copy());
index ad20780..8915461 100644 (file)
@@ -34,7 +34,6 @@ import com.netflix.iceberg.expressions.Binder;
 import com.netflix.iceberg.expressions.Expression;
 import com.netflix.iceberg.expressions.Expressions;
 import com.netflix.iceberg.expressions.InclusiveManifestEvaluator;
-import com.netflix.iceberg.expressions.Projections;
 import com.netflix.iceberg.expressions.ResidualEvaluator;
 import com.netflix.iceberg.io.CloseableIterable;
 import com.netflix.iceberg.types.TypeUtil;
@@ -177,7 +176,7 @@ class BaseTableScan implements TableScan {
       Iterable<Iterable<FileScanTask>> readers = Iterables.transform(
           matchingManifests,
           manifest -> {
-            ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()));
+            ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()));
             toClose.add(reader);
             String schemaString = SchemaParser.toJson(reader.spec().schema());
             String specString = PartitionSpecParser.toJson(reader.spec());
index a860117..1a56b7e 100644 (file)
@@ -23,8 +23,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.netflix.iceberg.exceptions.CommitFailedException;
-import com.netflix.iceberg.io.InputFile;
-import com.netflix.iceberg.io.OutputFile;
 import com.netflix.iceberg.util.Tasks;
 import java.util.List;
 import java.util.Map;
@@ -263,18 +261,13 @@ class BaseTransaction implements Transaction {
     }
 
     @Override
-    public InputFile newInputFile(String path) {
-      return ops.newInputFile(path);
+    public FileIO io() {
+      return ops.io();
     }
 
     @Override
-    public OutputFile newMetadataFile(String filename) {
-      return ops.newMetadataFile(filename);
-    }
-
-    @Override
-    public void deleteFile(String path) {
-      ops.deleteFile(path);
+    public String metadataFileLocation(String fileName) {
+      return ops.metadataFileLocation(fileName);
     }
 
     @Override
diff --git a/core/src/main/java/com/netflix/iceberg/FileIO.java b/core/src/main/java/com/netflix/iceberg/FileIO.java
new file mode 100644 (file)
index 0000000..fdba7af
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.netflix.iceberg.io.InputFile;
+import com.netflix.iceberg.io.OutputFile;
+
+import java.io.Serializable;
+
+/**
+ * Pluggable module for reading, writing, and deleting files.
+ * <p>
+ * Both table metadata files and data files can be written and read by this module. Implementations
+ * must be serializable because various clients of Spark tables may initialize this once and pass
+ * it off to a separate module that would then interact with the streams.
+ */
+public interface FileIO extends Serializable {
+
+  /**
+   * Get a {@link InputFile} instance to read bytes from the file at the given path.
+   */
+  InputFile newInputFile(String path);
+
+  /**
+   * Get a {@link OutputFile} instance to write bytes to the file at the given path.
+   */
+  OutputFile newOutputFile(String path);
+
+  /**
+   * Delete the file at the given path.
+   */
+  void deleteFile(String path);
+}
index 19d993f..d05ceca 100644 (file)
@@ -107,7 +107,7 @@ class ManifestGroup {
     Iterable<Iterable<ManifestEntry>> readers = Iterables.transform(
         matchingManifests,
         manifest -> {
-          ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()));
+          ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()));
           FilteredManifest filtered = reader.filterRows(dataFilter).select(columns);
           toClose.add(reader);
           return Iterables.filter(
index 8878d4c..156f9ea 100644 (file)
@@ -316,7 +316,7 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
       return manifest;
     }
 
-    try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
+    try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()))) {
       Expression inclusiveExpr = Projections
           .inclusive(reader.spec())
           .project(deleteExpression);
@@ -463,7 +463,7 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
     try {
 
       for (ManifestFile manifest : bin) {
-        try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
+        try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()))) {
           for (ManifestEntry entry : reader.entries()) {
             if (entry.status() == Status.DELETED) {
               // suppress deletes from previous snapshots. only files deleted by this snapshot
index 9ce6981..541cc5f 100644 (file)
@@ -50,7 +50,7 @@ class RemoveSnapshots implements ExpireSnapshots {
   private final Consumer<String> defaultDelete = new Consumer<String>() {
     @Override
     public void accept(String file) {
-      ops.deleteFile(file);
+      ops.io().deleteFile(file);
     }
   };
 
@@ -164,7 +164,7 @@ class RemoveSnapshots implements ExpireSnapshots {
         ).run(manifest -> {
           // even if the manifest is still used, it may contain files that can be deleted
           // TODO: eliminate manifests with no deletes without scanning
-          try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
+          try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()))) {
             for (ManifestEntry entry : reader.entries()) {
               // if the snapshot ID of the DELETE entry is no longer valid, the data can be deleted
               if (entry.status() == ManifestEntry.Status.DELETED &&
index a5ce08c..d73da8a 100644 (file)
@@ -22,12 +22,9 @@ package com.netflix.iceberg;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.util.JsonUtil;
-import com.netflix.iceberg.util.Tasks;
-import com.netflix.iceberg.util.ThreadPools;
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.List;
@@ -92,13 +89,13 @@ public class SnapshotParser {
     if (node.has(MANIFEST_LIST)) {
       // the manifest list is stored in a manifest list file
       String manifestList = JsonUtil.getString(MANIFEST_LIST, node);
-      return new BaseSnapshot(ops, versionId, parentId, timestamp, ops.newInputFile(manifestList));
+      return new BaseSnapshot(ops, versionId, parentId, timestamp, ops.io().newInputFile(manifestList));
 
     } else {
       // fall back to an embedded manifest list. pass in the manifest's InputFile so length can be
       // loaded lazily, if it is needed
       List<ManifestFile> manifests = Lists.transform(JsonUtil.getStringList(MANIFESTS, node),
-          location -> new GenericManifestFile(ops.newInputFile(location), 0));
+          location -> new GenericManifestFile(ops.io().newInputFile(location), 0));
       return new BaseSnapshot(ops, versionId, parentId, timestamp, manifests);
     }
   }
index 54c0483..ce9d59c 100644 (file)
@@ -132,7 +132,7 @@ abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
 
       return new BaseSnapshot(ops,
           snapshotId(), parentSnapshotId, System.currentTimeMillis(),
-          ops.newInputFile(manifestList.location()));
+          ops.io().newInputFile(manifestList.location()));
 
     } else {
       return new BaseSnapshot(ops,
@@ -188,16 +188,17 @@ abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
   }
 
   protected void deleteFile(String path) {
-    ops.deleteFile(path);
+    ops.io().deleteFile(path);
   }
 
   protected OutputFile manifestListPath() {
-    return ops.newMetadataFile(FileFormat.AVRO.addExtension(
-        String.format("snap-%d-%s", snapshotId(), commitUUID)));
+    return ops.io().newOutputFile(ops.metadataFileLocation(FileFormat.AVRO.addExtension(
+        String.format("snap-%d-%s", snapshotId(), commitUUID))));
   }
 
   protected OutputFile manifestPath(int i) {
-    return ops.newMetadataFile(FileFormat.AVRO.addExtension(commitUUID + "-m" + i));
+    return ops.io().newOutputFile(
+        ops.metadataFileLocation(FileFormat.AVRO.addExtension(commitUUID + "-m" + i)));
   }
 
   protected long snapshotId() {
@@ -208,7 +209,7 @@ abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
   }
 
   private static ManifestFile addMetadata(TableOperations ops, ManifestFile manifest) {
-    try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
+    try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()))) {
       PartitionSummary stats = new PartitionSummary(ops.current().spec(manifest.partitionSpecId()));
       int addedFiles = 0;
       int existingFiles = 0;
index a0c94b8..e9d4388 100644 (file)
@@ -19,7 +19,6 @@
 
 package com.netflix.iceberg;
 
-import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.io.OutputFile;
 
 /**
@@ -56,27 +55,18 @@ public interface TableOperations {
   void commit(TableMetadata base, TableMetadata metadata);
 
   /**
-   * Create a new {@link InputFile} for a path.
-   *
-   * @param path a string file path
-   * @return an InputFile instance for the path
+   * @return a {@link com.netflix.iceberg.FileIO} to read and write table data and metadata files
    */
-  InputFile newInputFile(String path);
+  FileIO io();
 
   /**
-   * Create a new {@link OutputFile} in the table's metadata store.
-   *
-   * @param filename a string file name, not a full path
-   * @return an OutputFile instance for the path
-   */
-  OutputFile newMetadataFile(String filename);
-
-  /**
-   * Delete a file.
-   *
-   * @param path path to the file
+   * Given the name of a metadata file, obtain the full path of that file using an appropriate base
+   * location of the implementation's choosing.
+   * <p>
+   * The file may not exist yet, in which case the path should be returned as if it were to be created
+   * by e.g. {@link FileIO#newOutputFile(String)}.
    */
-  void deleteFile(String path);
+  String metadataFileLocation(String fileName);
 
   /**
    * Create a new ID for a Snapshot
diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopFileIO.java b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopFileIO.java
new file mode 100644 (file)
index 0000000..586942c
--- /dev/null
@@ -0,0 +1,43 @@
+package com.netflix.iceberg.hadoop;
+
+import com.netflix.iceberg.FileIO;
+import com.netflix.iceberg.exceptions.RuntimeIOException;
+import com.netflix.iceberg.io.InputFile;
+import com.netflix.iceberg.io.OutputFile;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+public class HadoopFileIO implements FileIO {
+
+  private final SerializableConfiguration hadoopConf;
+
+  public HadoopFileIO(Configuration hadoopConf) {
+    this.hadoopConf = new SerializableConfiguration(hadoopConf);
+  }
+
+  @Override
+  public InputFile newInputFile(String path) {
+    return HadoopInputFile.fromLocation(path, hadoopConf.get());
+  }
+
+  @Override
+  public OutputFile newOutputFile(String path) {
+    return HadoopOutputFile.fromPath(new Path(path), hadoopConf.get());
+  }
+
+  @Override
+  public void deleteFile(String path) {
+    Path toDelete = new Path(path);
+    FileSystem fs = Util.getFS(toDelete, hadoopConf.get());
+    try {
+      fs.delete(toDelete, false /* not recursive */);
+    } catch (IOException e) {
+      throw new RuntimeIOException(e, "Failed to delete file: %s", path);
+    }
+  }
+}
index 875643e..d953056 100644 (file)
 
 package com.netflix.iceberg.hadoop;
 
+import com.netflix.iceberg.FileIO;
 import com.netflix.iceberg.TableMetadata;
 import com.netflix.iceberg.TableMetadataParser;
 import com.netflix.iceberg.TableOperations;
 import com.netflix.iceberg.exceptions.CommitFailedException;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.exceptions.ValidationException;
-import com.netflix.iceberg.io.InputFile;
-import com.netflix.iceberg.io.OutputFile;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -53,6 +52,7 @@ public class HadoopTableOperations implements TableOperations {
   private TableMetadata currentMetadata = null;
   private Integer version = null;
   private boolean shouldRefresh = true;
+  private HadoopFileIO defaultFileIo = null;
 
   protected HadoopTableOperations(Path location, Configuration conf) {
     this.conf = conf;
@@ -91,7 +91,7 @@ public class HadoopTableOperations implements TableOperations {
     }
     this.version = ver;
     this.currentMetadata = TableMetadataParser.read(this,
-        HadoopInputFile.fromPath(metadataFile, conf));
+        io().newInputFile(metadataFile.toString()));
     this.shouldRefresh = false;
     return currentMetadata;
   }
@@ -108,7 +108,7 @@ public class HadoopTableOperations implements TableOperations {
     }
 
     Path tempMetadataFile = metadataPath(UUID.randomUUID().toString() + getFileExtension(conf));
-    TableMetadataParser.write(metadata, HadoopOutputFile.fromPath(tempMetadataFile, conf));
+    TableMetadataParser.write(metadata, io().newOutputFile(tempMetadataFile.toString()));
 
     int nextVersion = (version != null ? version : 0) + 1;
     Path finalMetadataFile = metadataFile(nextVersion);
@@ -142,24 +142,16 @@ public class HadoopTableOperations implements TableOperations {
   }
 
   @Override
-  public InputFile newInputFile(String path) {
-    return HadoopInputFile.fromPath(new Path(path), conf);
-  }
-
-  @Override
-  public OutputFile newMetadataFile(String filename) {
-    return HadoopOutputFile.fromPath(metadataPath(filename), conf);
+  public FileIO io() {
+    if (defaultFileIo == null) {
+      defaultFileIo = new HadoopFileIO(conf);
+    }
+    return defaultFileIo;
   }
 
   @Override
-  public void deleteFile(String path) {
-    Path toDelete = new Path(path);
-    FileSystem fs = getFS(toDelete, conf);
-    try {
-      fs.delete(toDelete, false /* not recursive */ );
-    } catch (IOException e) {
-      throw new RuntimeIOException(e, "Failed to delete file: %s", path);
-    }
+  public String metadataFileLocation(String fileName) {
+    return metadataPath(fileName).toString();
   }
 
   @Override
@@ -194,7 +186,7 @@ public class HadoopTableOperations implements TableOperations {
   private int readVersionHint() {
     Path versionHintFile = versionHintFile();
     try {
-      FileSystem fs = versionHintFile.getFileSystem(conf);
+      FileSystem fs = Util.getFS(versionHintFile, conf);
       if (!fs.exists(versionHintFile)) {
         return 0;
       }
diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/SerializableConfiguration.java b/core/src/main/java/com/netflix/iceberg/hadoop/SerializableConfiguration.java
new file mode 100644 (file)
index 0000000..30c7563
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg.hadoop;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Wraps a {@link Configuration} object in a {@link Serializable} layer.
+ */
+public class SerializableConfiguration implements Serializable {
+
+  private transient Configuration hadoopConf;
+
+  public SerializableConfiguration(Configuration hadoopCOnf) {
+    this.hadoopConf = hadoopCOnf;
+  }
+
+  private void writeObject(ObjectOutputStream out) throws IOException {
+    out.defaultWriteObject();
+    hadoopConf.write(out);
+  }
+
+  private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
+    in.defaultReadObject();
+    hadoopConf = new Configuration(false);
+    hadoopConf.readFields(in);
+  }
+
+  public Configuration get() {
+    return hadoopConf;
+  }
+}
index 27a01fc..1508ee8 100644 (file)
 
 package com.netflix.iceberg;
 
+import com.google.common.collect.Maps;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
-import com.netflix.iceberg.io.InputFile;
-import com.netflix.iceberg.io.OutputFile;
+import java.util.Map;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
 import java.io.IOException;
 
-import static com.netflix.iceberg.Files.localInput;
-
 class LocalTableOperations implements TableOperations {
   private final TemporaryFolder temp;
+  private final FileIO io;
+
+  private final Map<String, String> createdMetadataFilePaths = Maps.newHashMap();
 
   LocalTableOperations(TemporaryFolder temp) {
     this.temp = temp;
+    this.io = new TestTables.LocalFileIO();
   }
 
   @Override
@@ -52,25 +53,19 @@ class LocalTableOperations implements TableOperations {
   }
 
   @Override
-  public InputFile newInputFile(String path) {
-    return localInput(path);
-  }
-
-  @Override
-  public OutputFile newMetadataFile(String filename) {
-    try {
-      File metadataFile = temp.newFile(filename);
-      metadataFile.delete();
-      metadataFile.deleteOnExit();
-      return Files.localOutput(metadataFile);
-    } catch (IOException e) {
-      throw new RuntimeIOException(e);
-    }
+  public FileIO io() {
+    return io;
   }
 
   @Override
-  public void deleteFile(String path) {
-    new File(path).delete();
+  public String metadataFileLocation(String fileName) {
+    return createdMetadataFilePaths.computeIfAbsent(fileName, name -> {
+      try {
+        return temp.newFile(name).getAbsolutePath();
+      } catch (IOException e) {
+        throw new RuntimeIOException(e);
+      }
+    });
   }
 
   @Override
index fcb9bfc..e6aea02 100644 (file)
@@ -27,6 +27,7 @@ import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.io.OutputFile;
 import java.io.File;
+import java.io.IOException;
 import java.util.Map;
 
 import static com.netflix.iceberg.TableMetadata.newTableMetadata;
@@ -174,13 +175,33 @@ class TestTables {
     }
 
     @Override
+    public FileIO io() {
+      return new LocalFileIO();
+    }
+
+    @Override
+    public String metadataFileLocation(String fileName) {
+      return new File(metadata, fileName).getAbsolutePath();
+    }
+
+    @Override
+    public long newSnapshotId() {
+      long nextSnapshotId = lastSnapshotId + 1;
+      this.lastSnapshotId = nextSnapshotId;
+      return nextSnapshotId;
+    }
+  }
+
+  static class LocalFileIO implements FileIO {
+
+    @Override
     public InputFile newInputFile(String path) {
       return Files.localInput(path);
     }
 
     @Override
-    public OutputFile newMetadataFile(String filename) {
-      return Files.localOutput(new File(metadata, filename));
+    public OutputFile newOutputFile(String path) {
+      return Files.localOutput(path);
     }
 
     @Override
@@ -189,12 +210,5 @@ class TestTables {
         throw new RuntimeIOException("Failed to delete file: " + path);
       }
     }
-
-    @Override
-    public long newSnapshotId() {
-      long nextSnapshotId = lastSnapshotId + 1;
-      this.lastSnapshotId = nextSnapshotId;
-      return nextSnapshotId;
-    }
   }
 }
index a506b4e..4f59556 100644 (file)
@@ -79,7 +79,7 @@ class TableScanIterable extends CloseableGroup implements CloseableIterable<Reco
   }
 
   private CloseableIterable<Record> open(FileScanTask task) {
-    InputFile input = ops.newInputFile(task.file().path().toString());
+    InputFile input = ops.io().newInputFile(task.file().path().toString());
 
     // TODO: join to partition data from the manifest file
     switch (task.file().format()) {
index 38a0cb0..0199e7f 100644 (file)
@@ -161,7 +161,7 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
     } finally {
       if (threw) {
         // if anything went wrong, clean up the uncommitted metadata file
-        deleteFile(newMetadataLocation);
+        io().deleteFile(newMetadataLocation);
       }
       unlock(lockId);
     }
index a7ff513..90b6dc8 100644 (file)
@@ -21,6 +21,7 @@ package com.netflix.iceberg.spark.source;
 
 import com.google.common.collect.Maps;
 import com.netflix.iceberg.BaseTable;
+import com.netflix.iceberg.FileIO;
 import com.netflix.iceberg.Files;
 import com.netflix.iceberg.PartitionSpec;
 import com.netflix.iceberg.Schema;
@@ -33,6 +34,7 @@ import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.io.OutputFile;
 import java.io.File;
+import java.io.IOException;
 import java.util.Map;
 
 // TODO: Use the copy of this from core.
@@ -154,15 +156,33 @@ class TestTables {
     }
 
     @Override
+    public FileIO io() {
+      return new LocalFileIO();
+    }
+
+    @Override
+    public String metadataFileLocation(String fileName) {
+      return new File(new File(current.location(), "metadata"), fileName).getAbsolutePath();
+    }
+
+    @Override
+    public long newSnapshotId() {
+      long nextSnapshotId = lastSnapshotId + 1;
+      this.lastSnapshotId = nextSnapshotId;
+      return nextSnapshotId;
+    }
+  }
+  
+  static class LocalFileIO implements FileIO {
+
+    @Override
     public InputFile newInputFile(String path) {
       return Files.localInput(path);
     }
 
     @Override
-    public OutputFile newMetadataFile(String filename) {
-      File metadata = new File(current.location(), "metadata");
-      metadata.mkdirs();
-      return Files.localOutput(new File(metadata, filename));
+    public OutputFile newOutputFile(String path) {
+      return Files.localOutput(new File(path));
     }
 
     @Override
@@ -171,12 +191,5 @@ class TestTables {
         throw new RuntimeIOException("Failed to delete file: " + path);
       }
     }
-
-    @Override
-    public long newSnapshotId() {
-      long nextSnapshotId = lastSnapshotId + 1;
-      this.lastSnapshotId = nextSnapshotId;
-      return nextSnapshotId;
-    }
   }
 }