Use a UUID-based approach to generate snapshot ids (#57)
authorAnton Okolnychyi <aokolnychyi@apple.com>
Fri, 21 Dec 2018 21:58:12 +0000 (23:58 +0200)
committerRyan Blue <rdblue@users.noreply.github.com>
Fri, 21 Dec 2018 21:58:12 +0000 (13:58 -0800)
build.gradle
core/src/main/java/com/netflix/iceberg/BaseMetastoreTableOperations.java
core/src/main/java/com/netflix/iceberg/TableOperations.java
core/src/main/java/com/netflix/iceberg/hadoop/HadoopTableOperations.java
hive/src/main/java/com/netflix/iceberg/hive/HiveTableOperations.java
hive/src/test/java/com/netflix/iceberg/hive/HiveTablesTest.java

index 096156b..f6be711 100644 (file)
@@ -138,18 +138,24 @@ project(':iceberg-data') {
 }
 
 project(':iceberg-hive') {
-    dependencies {
-      compile project(':iceberg-core')
+  dependencies {
+    compile project(':iceberg-core')
 
-      compileOnly "org.apache.hive:hive-standalone-metastore:$hiveVersion"
+    compileOnly "org.apache.avro:avro:$avroVersion"
 
-      testCompile "org.apache.hive:hive-exec:3.1.0"
+    compileOnly("org.apache.hive:hive-standalone-metastore:$hiveVersion") {
+      exclude group: 'org.apache.avro', module: 'avro'
+    }
 
-      compileOnly('org.apache.hadoop:hadoop-client:2.7.3') {
-        exclude group: 'org.apache.avro', module: 'avro'
-        exclude group: 'org.slf4j', module: 'slf4j-log4j12'
-      }
+    testCompile("org.apache.hive:hive-exec:$hiveVersion") {
+      exclude group: 'org.apache.avro', module: 'avro'
     }
+
+    compileOnly("org.apache.hadoop:hadoop-client:$hadoopVersion") {
+      exclude group: 'org.apache.avro', module: 'avro'
+      exclude group: 'org.slf4j', module: 'slf4j-log4j12'
+    }
+  }
 }
 
 project(':iceberg-orc') {
index 79a1337..b107d0b 100644 (file)
@@ -134,11 +134,6 @@ public abstract class BaseMetastoreTableOperations implements TableOperations {
     return fileIo;
   }
 
-  @Override
-  public long newSnapshotId() {
-    return System.currentTimeMillis();
-  }
-
   private String newTableMetadataFilePath(String baseLocation, int newVersion) {
     return String.format("%s/%s/%05d-%s%s",
             baseLocation,
index e9d4388..19fc386 100644 (file)
@@ -19,6 +19,8 @@
 
 package com.netflix.iceberg;
 
+import java.util.UUID;
+
 import com.netflix.iceberg.io.OutputFile;
 
 /**
@@ -73,6 +75,11 @@ public interface TableOperations {
    *
    * @return a long snapshot ID
    */
-  long newSnapshotId();
+  default long newSnapshotId() {
+    UUID uuid = UUID.randomUUID();
+    long mostSignificantBits = uuid.getMostSignificantBits();
+    long leastSignificantBits = uuid.getLeastSignificantBits();
+    return Math.abs(mostSignificantBits ^ leastSignificantBits);
+  }
 
 }
index d953056..4aa19f4 100644 (file)
@@ -154,11 +154,6 @@ public class HadoopTableOperations implements TableOperations {
     return metadataPath(fileName).toString();
   }
 
-  @Override
-  public long newSnapshotId() {
-    return System.currentTimeMillis();
-  }
-
   private Path metadataFile(int version) {
     return metadataPath("v" + version + getFileExtension(conf));
   }
index 0199e7f..3709370 100644 (file)
@@ -57,6 +57,9 @@ import static java.lang.String.format;
 /**
  * TODO we should be able to extract some more commonalities to BaseMetastoreTableOperations to
  * avoid code duplication between this class and Metacat Tables.
+ *
+ * Note! This class is not thread-safe as {@link ThriftHiveMetastore.Client} does not behave
+ * correctly in a multi-threaded environment.
  */
 public class HiveTableOperations extends BaseMetastoreTableOperations {
   private static final Logger LOG = LoggerFactory.getLogger(HiveTableOperations.class);
index a6e6f7a..6226a43 100644 (file)
  */
 package com.netflix.iceberg.hive;
 
+import com.netflix.iceberg.DataFile;
+import com.netflix.iceberg.DataFiles;
+import com.netflix.iceberg.FileFormat;
 import com.netflix.iceberg.exceptions.CommitFailedException;
 import com.netflix.iceberg.types.Types;
+import com.netflix.iceberg.util.Tasks;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.thrift.TException;
 import org.junit.Assert;
@@ -24,11 +28,14 @@ import org.junit.Test;
 
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static com.netflix.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE;
 import static com.netflix.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
 import static com.netflix.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
+import static com.netflix.iceberg.util.ThreadPools.getWorkerPool;
 
 public class HiveTablesTest extends HiveTableBaseTest {
   @Test
@@ -87,4 +94,38 @@ public class HiveTablesTest extends HiveTableBaseTest {
             .addColumn("data", Types.LongType.get())
             .commit();
   }
+
+  @Test
+  public void testConcurrentFastAppends() {
+    HiveTables hiveTables = new HiveTables(hiveConf);
+    com.netflix.iceberg.Table icebergTable = hiveTables.load(DB_NAME, TABLE_NAME);
+    com.netflix.iceberg.Table anotherIcebergTable = hiveTables.load(DB_NAME, TABLE_NAME);
+
+    String fileName = UUID.randomUUID().toString();
+    DataFile file = DataFiles.builder(icebergTable.spec())
+      .withPath(FileFormat.PARQUET.addExtension(fileName))
+      .withRecordCount(2)
+      .withFileSizeInBytes(0)
+      .build();
+
+    Tasks.foreach(icebergTable, anotherIcebergTable)
+      .stopOnFailure().throwFailureWhenFinished()
+      .executeWith(getWorkerPool())
+      .run(table -> {
+        for (int numCommittedFiles = 0; numCommittedFiles < 10; numCommittedFiles++) {
+          long commitStartTime = System.currentTimeMillis();
+          table.newFastAppend().appendFile(file).commit();
+          long commitEndTime = System.currentTimeMillis();
+          long commitDuration = commitEndTime - commitStartTime;
+          try {
+            TimeUnit.MILLISECONDS.sleep(200 - commitDuration);
+          } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      });
+
+    icebergTable.refresh();
+    Assert.assertEquals(20, icebergTable.currentSnapshot().manifests().size());
+  }
 }