Support customizing table locations (#68)
authorRyan Blue <rdblue@users.noreply.github.com>
Mon, 14 Jan 2019 21:45:55 +0000 (13:45 -0800)
committerGitHub <noreply@github.com>
Mon, 14 Jan 2019 21:45:55 +0000 (13:45 -0800)
* Add write.metadata.path to relocate metadata.
* Add UpdateLocation API to change a table's base location.
* Remove empty folder from Hive locations.

12 files changed:
api/src/main/java/com/netflix/iceberg/Table.java
api/src/main/java/com/netflix/iceberg/Transaction.java
api/src/main/java/com/netflix/iceberg/UpdateLocation.java [new file with mode: 0644]
core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java
core/src/main/java/com/netflix/iceberg/BaseTable.java
core/src/main/java/com/netflix/iceberg/BaseTransaction.java
core/src/main/java/com/netflix/iceberg/SetLocation.java [new file with mode: 0644]
core/src/main/java/com/netflix/iceberg/TableMetadata.java
core/src/main/java/com/netflix/iceberg/TableProperties.java
core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java
hive/src/main/java/com/netflix/iceberg/hive/HiveTableOperations.java
hive/src/test/java/com/netflix/iceberg/hive/HiveTableBaseTest.java

index 9acb50d..1b61b84 100644 (file)
@@ -98,6 +98,13 @@ public interface Table {
   UpdateProperties updateProperties();
 
   /**
+   * Create a new {@link UpdateLocation} to update table location and commit the changes.
+   *
+   * @return a new {@link UpdateLocation}
+   */
+  UpdateLocation updateLocation();
+
+  /**
    * Create a new {@link AppendFiles append API} to add files to this table and commit.
    *
    * @return a new {@link AppendFiles}
index 8ab4b64..22d7e5a 100644 (file)
@@ -48,6 +48,13 @@ public interface Transaction {
   UpdateProperties updateProperties();
 
   /**
+   * Create a new {@link UpdateLocation} to update table location.
+   *
+   * @return a new {@link UpdateLocation}
+   */
+  UpdateLocation updateLocation();
+
+  /**
    * Create a new {@link AppendFiles append API} to add files to this table.
    *
    * @return a new {@link AppendFiles}
diff --git a/api/src/main/java/com/netflix/iceberg/UpdateLocation.java b/api/src/main/java/com/netflix/iceberg/UpdateLocation.java
new file mode 100644 (file)
index 0000000..e508531
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+/**
+ * API for setting a table's base location.
+ */
+public interface UpdateLocation extends PendingUpdate<String> {
+  /**
+   * Set the table's location.
+   *
+   * @param location a String location
+   * @return this for method chaining
+   */
+  UpdateLocation setLocation(String location);
+}
index 3e9b420..5228e80 100644 (file)
@@ -49,7 +49,6 @@ public abstract class BaseMetastoreTableOperations implements TableOperations {
 
   private static final String METADATA_FOLDER_NAME = "metadata";
   private static final String DATA_FOLDER_NAME = "data";
-  private static final String HIVE_LOCATION_FOLDER_NAME = "empty";
 
   private final Configuration conf;
   private final FileIO fileIo;
@@ -57,7 +56,6 @@ public abstract class BaseMetastoreTableOperations implements TableOperations {
   private TableMetadata currentMetadata = null;
   private String currentMetadataLocation = null;
   private boolean shouldRefresh = true;
-  private String baseLocation = null;
   private int version = -1;
 
   protected BaseMetastoreTableOperations(Configuration conf) {
@@ -85,16 +83,8 @@ public abstract class BaseMetastoreTableOperations implements TableOperations {
     this.shouldRefresh = true;
   }
 
-  public String hiveTableLocation() {
-    return String.format("%s/%s", baseLocation, HIVE_LOCATION_FOLDER_NAME);
-  }
-
   protected String writeNewMetadata(TableMetadata metadata, int version) {
-    if (baseLocation == null) {
-      baseLocation = metadata.location();
-    }
-
-    String newTableMetadataFilePath = newTableMetadataFilePath(baseLocation, version);
+    String newTableMetadataFilePath = newTableMetadataFilePath(metadata, version);
     OutputFile newMetadataLocation = fileIo.newOutputFile(newTableMetadataFilePath);
 
     // write the new metadata
@@ -115,19 +105,29 @@ public abstract class BaseMetastoreTableOperations implements TableOperations {
       Tasks.foreach(newLocation)
           .retry(numRetries).exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ... */ )
           .suppressFailureWhenFinished()
-          .run(location -> {
-            this.currentMetadata = read(this, fromLocation(location, conf));
-            this.currentMetadataLocation = location;
-            this.baseLocation = currentMetadata.location();
-            this.version = parseVersion(location);
+          .run(metadataLocation -> {
+            this.currentMetadata = read(this, fromLocation(metadataLocation, conf));
+            this.currentMetadataLocation = metadataLocation;
+            this.version = parseVersion(metadataLocation);
           });
     }
     this.shouldRefresh = false;
   }
 
+  private String metadataFileLocation(TableMetadata metadata, String filename) {
+    String metadataLocation = metadata.properties()
+        .get(TableProperties.WRITE_METADATA_LOCATION);
+
+    if (metadataLocation != null) {
+      return String.format("%s/%s", metadataLocation, filename);
+    } else {
+      return String.format("%s/%s/%s", metadata.location(), METADATA_FOLDER_NAME, filename);
+    }
+  }
+
   @Override
-  public String metadataFileLocation(String fileName) {
-    return String.format("%s/%s/%s", baseLocation, METADATA_FOLDER_NAME, fileName);
+  public String metadataFileLocation(String filename) {
+    return metadataFileLocation(current(), filename);
   }
 
   @Override
@@ -135,13 +135,9 @@ public abstract class BaseMetastoreTableOperations implements TableOperations {
     return fileIo;
   }
 
-  private String newTableMetadataFilePath(String baseLocation, int newVersion) {
-    return String.format("%s/%s/%05d-%s%s",
-            baseLocation,
-            METADATA_FOLDER_NAME,
-            newVersion,
-            UUID.randomUUID(),
-            getFileExtension(this.conf));
+  private String newTableMetadataFilePath(TableMetadata meta, int newVersion) {
+    return metadataFileLocation(meta,
+        String.format("%05d-%s%s", newVersion, UUID.randomUUID(), getFileExtension(this.conf)));
   }
 
   private static int parseVersion(String metadataLocation) {
index 7d48ef2..ad0d324 100644 (file)
@@ -92,6 +92,11 @@ public class BaseTable implements Table, HasTableOperations {
   }
 
   @Override
+  public UpdateLocation updateLocation() {
+    return new SetLocation(ops);
+  }
+
+  @Override
   public AppendFiles newAppend() {
     return new MergeAppend(ops);
   }
index e7d9864..8caf45c 100644 (file)
@@ -116,6 +116,14 @@ class BaseTransaction implements Transaction {
   }
 
   @Override
+  public UpdateLocation updateLocation() {
+    checkLastOperationCommitted("UpdateLocation");
+    UpdateLocation setLocation = new SetLocation(transactionOps);
+    updates.add(setLocation);
+    return setLocation;
+  }
+
+  @Override
   public AppendFiles newAppend() {
     checkLastOperationCommitted("AppendFiles");
     AppendFiles append = new MergeAppend(transactionOps);
@@ -336,6 +344,11 @@ class BaseTransaction implements Transaction {
     }
 
     @Override
+    public UpdateLocation updateLocation() {
+      return BaseTransaction.this.updateLocation();
+    }
+
+    @Override
     public AppendFiles newAppend() {
       return BaseTransaction.this.newAppend();
     }
diff --git a/core/src/main/java/com/netflix/iceberg/SetLocation.java b/core/src/main/java/com/netflix/iceberg/SetLocation.java
new file mode 100644 (file)
index 0000000..be64971
--- /dev/null
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.netflix.iceberg.exceptions.CommitFailedException;
+import com.netflix.iceberg.util.Tasks;
+
+import static com.netflix.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS;
+import static com.netflix.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
+import static com.netflix.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS;
+import static com.netflix.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
+import static com.netflix.iceberg.TableProperties.COMMIT_NUM_RETRIES;
+import static com.netflix.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
+import static com.netflix.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
+import static com.netflix.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
+
+public class SetLocation implements UpdateLocation {
+  private final TableOperations ops;
+  private String newLocation;
+
+  public SetLocation(TableOperations ops) {
+    this.ops = ops;
+    this.newLocation = null;
+  }
+
+  @Override
+  public UpdateLocation setLocation(String location) {
+    this.newLocation = location;
+    return this;
+  }
+
+  @Override
+  public String apply() {
+    return newLocation;
+  }
+
+  @Override
+  public void commit() {
+    TableMetadata base = ops.refresh();
+    Tasks.foreach(ops)
+        .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+        .exponentialBackoff(
+            base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+            base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+            base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+            2.0 /* exponential */ )
+        .onlyRetryOn(CommitFailedException.class)
+        .run(ops -> ops.commit(base, base.updateLocation(newLocation)));
+  }
+}
index c949f13..645dff2 100644 (file)
@@ -428,6 +428,12 @@ public class TableMetadata {
         -1, snapshots, ImmutableList.of());
   }
 
+  public TableMetadata updateLocation(String newLocation) {
+    return new TableMetadata(ops, null, newLocation,
+        System.currentTimeMillis(), lastColumnId, schema, defaultSpecId, specs, properties,
+        currentSnapshotId, snapshots, snapshotLog);
+  }
+
   private static PartitionSpec freshSpec(int specId, Schema schema, PartitionSpec partitionSpec) {
     PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema)
         .withSpecId(specId);
index 69bfcf2..925900a 100644 (file)
@@ -67,11 +67,16 @@ public class TableProperties {
 
   public static final String OBJECT_STORE_PATH = "write.object-storage.path";
 
-  // This only applies to files written after this property is set. Files previously written aren't relocated to
-  // reflect this parameter.
+  // This only applies to files written after this property is set. Files previously written aren't
+  // relocated to reflect this parameter.
   // If not set, defaults to a "data" folder underneath the root path of the table.
   public static final String WRITE_NEW_DATA_LOCATION = "write.folder-storage.path";
 
+  // This only applies to files written after this property is set. Files previously written aren't
+  // relocated to reflect this parameter.
+  // If not set, defaults to a "meatdata" folder underneath the root path of the table.
+  public static final String WRITE_METADATA_LOCATION = "write.metadata.path";
+
   public static final String MANIFEST_LISTS_ENABLED = "write.manifest-lists.enabled";
   public static final boolean MANIFEST_LISTS_ENABLED_DEFAULT = true;
 }
index 1a3b0fd..e8c98dd 100644 (file)
 
 package com.netflix.iceberg.hadoop;
 
+import com.google.common.base.Preconditions;
 import com.netflix.iceberg.io.FileIO;
 import com.netflix.iceberg.TableMetadata;
 import com.netflix.iceberg.TableMetadataParser;
 import com.netflix.iceberg.TableOperations;
+import com.netflix.iceberg.TableProperties;
 import com.netflix.iceberg.exceptions.CommitFailedException;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.exceptions.ValidationException;
@@ -107,6 +109,12 @@ public class HadoopTableOperations implements TableOperations {
       return;
     }
 
+    Preconditions.checkArgument(base == null || base.location().equals(metadata.location()),
+        "Hadoop path-based tables cannot be relocated");
+    Preconditions.checkArgument(
+        !metadata.properties().containsKey(TableProperties.WRITE_METADATA_LOCATION),
+        "Hadoop path-based tables cannot relocate metadata");
+
     Path tempMetadataFile = metadataPath(UUID.randomUUID().toString() + getFileExtension(conf));
     TableMetadataParser.write(metadata, io().newOutputFile(tempMetadataFile.toString()));
 
index 3709370..e0e2e19 100644 (file)
@@ -136,7 +136,7 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
                 (int) currentTimeMillis / 1000,
                 (int) currentTimeMillis / 1000,
                 Integer.MAX_VALUE,
-                storageDescriptor(metadata.schema()),
+                storageDescriptor(metadata),
                 Collections.emptyList(),
                 new HashMap<>(),
                 null,
@@ -144,7 +144,7 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
                 ICEBERG_TABLE_TYPE_VALUE);
       }
 
-      tbl.setSd(storageDescriptor(metadata.schema())); // set to pickup any schema changes
+      tbl.setSd(storageDescriptor(metadata)); // set to pickup any schema changes
       final String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP);
       if (!Objects.equals(currentMetadataLocation(), metadataLocation)) {
         throw new CommitFailedException(format("metadataLocation = %s is not same as table metadataLocation %s for %s.%s",
@@ -189,11 +189,11 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
     tbl.setParameters(parameters);
   }
 
-  private StorageDescriptor storageDescriptor(Schema schema) {
+  private StorageDescriptor storageDescriptor(TableMetadata metadata) {
 
     final StorageDescriptor storageDescriptor = new StorageDescriptor();
-    storageDescriptor.setCols(columns(schema));
-    storageDescriptor.setLocation(hiveTableLocation());
+    storageDescriptor.setCols(columns(metadata.schema()));
+    storageDescriptor.setLocation(metadata.location());
     storageDescriptor.setOutputFormat("org.apache.hadoop.mapred.FileInputFormat");
     storageDescriptor.setInputFormat("org.apache.hadoop.mapred.FileOutputFormat");
     SerDeInfo serDeInfo = new SerDeInfo();
index 8c2bfdc..f3955e2 100644 (file)
@@ -206,7 +206,7 @@ class HiveTableBaseTest {
   }
 
   String getTableLocation(String tableName) {
-    return new Path("file", null, Paths.get(getTableBasePath(tableName), "empty").toString()).toString();
+    return new Path("file", null, Paths.get(getTableBasePath(tableName)).toString()).toString();
   }
 
   String metadataLocation(String tableName) {