Fix commit retry with manfiest lists. (#48)
authorRyan Blue <rdblue@users.noreply.github.com>
Thu, 13 Dec 2018 21:18:37 +0000 (13:18 -0800)
committerGitHub <noreply@github.com>
Thu, 13 Dec 2018 21:18:37 +0000 (13:18 -0800)
A manifest list is created for every commit attempt. Before this update,
the same file was used, which caused retries to fail trying to create
the same list file. This uses a new location for every manifest list,
keeps track of old lists, and cleans up unused lists after a commit
succeeds.

core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java
core/src/test/java/com/netflix/iceberg/TestFastAppend.java

index ce9d59c..796df2f 100644 (file)
@@ -22,6 +22,7 @@ package com.netflix.iceberg;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.netflix.iceberg.exceptions.CommitFailedException;
 import com.netflix.iceberg.exceptions.RuntimeIOException;
@@ -35,6 +36,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static com.netflix.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS;
@@ -70,6 +72,8 @@ abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
 
   private final TableOperations ops;
   private final String commitUUID = UUID.randomUUID().toString();
+  private final AtomicInteger attempt = new AtomicInteger(0);
+  private final List<String> manifestLists = Lists.newArrayList();
   private Long snapshotId = null;
   private TableMetadata base = null;
 
@@ -110,7 +114,11 @@ abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
       OutputFile manifestList = manifestListPath();
 
       try (ManifestListWriter writer = new ManifestListWriter(
-          manifestListPath(), snapshotId(), parentSnapshotId)) {
+          manifestList, snapshotId(), parentSnapshotId)) {
+
+        // keep track of the manifest lists created
+        manifestLists.add(manifestList.location());
+
         ManifestFile[] manifestFiles = new ManifestFile[manifests.size()];
 
         Tasks.range(manifestFiles.length)
@@ -172,6 +180,12 @@ abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
       Snapshot saved = ops.refresh().snapshot(newSnapshotId.get());
       if (saved != null) {
         cleanUncommitted(Sets.newHashSet(saved.manifests()));
+        // also clean up unused manifest lists created by multiple attempts
+        for (String manifestList : manifestLists) {
+          if (!saved.manifestListLocation().equals(manifestList)) {
+            ops.io().deleteFile(manifestList);
+          }
+        }
       } else {
         // saved may not be present if the latest metadata couldn't be loaded due to eventual
         // consistency problems in refresh. in that case, don't clean up.
@@ -184,6 +198,10 @@ abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
   }
 
   protected void cleanAll() {
+    for (String manifestList : manifestLists) {
+      ops.io().deleteFile(manifestList);
+    }
+    manifestLists.clear();
     cleanUncommitted(EMPTY_SET);
   }
 
@@ -193,7 +211,7 @@ abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
 
   protected OutputFile manifestListPath() {
     return ops.io().newOutputFile(ops.metadataFileLocation(FileFormat.AVRO.addExtension(
-        String.format("snap-%d-%s", snapshotId(), commitUUID))));
+        String.format("snap-%d-%d-%s", snapshotId(), attempt.incrementAndGet(), commitUUID))));
   }
 
   protected OutputFile manifestPath(int i) {
index 4d9e174..88252bb 100644 (file)
@@ -171,7 +171,32 @@ public class TestFastAppend extends TableTestBase {
   }
 
   @Test
-  public void testRecovery() {
+  public void testRecoveryWithManifestList() {
+    table.updateProperties().set(TableProperties.MANIFEST_LISTS_ENABLED, "true").commit();
+
+    // inject 3 failures, the last try will succeed
+    TestTables.TestTableOperations ops = table.ops();
+    ops.failCommits(3);
+
+    AppendFiles append = table.newFastAppend().appendFile(FILE_B);
+    Snapshot pending = append.apply();
+    ManifestFile newManifest = pending.manifests().get(0);
+    Assert.assertTrue("Should create new manifest", new File(newManifest.path()).exists());
+
+    append.commit();
+
+    TableMetadata metadata = readMetadata();
+
+    validateSnapshot(null, metadata.currentSnapshot(), FILE_B);
+    Assert.assertTrue("Should commit same new manifest", new File(newManifest.path()).exists());
+    Assert.assertTrue("Should commit the same new manifest",
+        metadata.currentSnapshot().manifests().contains(newManifest));
+  }
+
+  @Test
+  public void testRecoveryWithoutManifestList() {
+    table.updateProperties().set(TableProperties.MANIFEST_LISTS_ENABLED, "false").commit();
+
     // inject 3 failures, the last try will succeed
     TestTables.TestTableOperations ops = table.ops();
     ops.failCommits(3);