HIVE-16832 duplicate ROW__ID possible in multi insert into transactional table (Eugen...
authorEugene Koifman <ekoifman@hortonworks.com>
Wed, 12 Jul 2017 23:02:16 +0000 (16:02 -0700)
committerEugene Koifman <ekoifman@hortonworks.com>
Wed, 12 Jul 2017 23:02:16 +0000 (16:02 -0700)
33 files changed:
common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/BucketIdResolverImpl.java
hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java
hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorImpl.java
hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java
hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestBucketIdResolverImpl.java
hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java
ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
ql/src/java/org/apache/hadoop/hive/ql/io/BucketCodec.java [new file with mode: 0644]
ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java
ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java
ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
ql/src/test/queries/clientpositive/acid_bucket_pruning.q
ql/src/test/results/clientpositive/acid_table_stats.q.out
ql/src/test/results/clientpositive/autoColumnStats_4.q.out
ql/src/test/results/clientpositive/insert_values_orig_table_use_metadata.q.out
ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out
ql/src/test/results/clientpositive/row__id.q.out

index 231dc9f..d31d5a0 100644 (file)
@@ -1321,6 +1321,9 @@ public class HiveConf extends Configuration {
     HIVETESTMODEROLLBACKTXN("hive.test.rollbacktxn", false, "For testing only.  Will mark every ACID transaction aborted", false),
     HIVETESTMODEFAILCOMPACTION("hive.test.fail.compaction", false, "For testing only.  Will cause CompactorMR to fail.", false),
     HIVETESTMODEFAILHEARTBEATER("hive.test.fail.heartbeater", false, "For testing only.  Will cause Heartbeater to fail.", false),
+    TESTMODE_BUCKET_CODEC_VERSION("hive.test.bucketcodec.version", 1,
+      "For testing only.  Will make ACID subsystem write RecordIdentifier.bucketId in specified\n" +
+        "format", false),
 
     HIVEMERGEMAPFILES("hive.merge.mapfiles", true,
         "Merge small files at the end of a map-only job"),
index 571e076..7c2cade 100644 (file)
@@ -19,6 +19,8 @@ package org.apache.hive.hcatalog.streaming.mutate.worker;
 
 import java.util.List;
 
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
@@ -68,7 +70,9 @@ public class BucketIdResolverImpl implements BucketIdResolver {
   @Override
   public Object attachBucketIdToRecord(Object record) {
     int bucketId = computeBucketId(record);
-    RecordIdentifier recordIdentifier = new RecordIdentifier(INVALID_TRANSACTION_ID, bucketId, INVALID_ROW_ID);
+    int bucketProperty =
+      BucketCodec.V1.encode(new AcidOutputFormat.Options(null).bucket(bucketId));
+    RecordIdentifier recordIdentifier = new RecordIdentifier(INVALID_TRANSACTION_ID, bucketProperty, INVALID_ROW_ID);
     structObjectInspector.setStructFieldData(record, recordIdentifierField, recordIdentifier);
     return record;
   }
index 1ad0842..ae23153 100644 (file)
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.io.RecordUpdater;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -183,7 +184,7 @@ public class MutatorCoordinator implements Closeable, Flushable {
   private void reconfigureState(OperationType operationType, List<String> newPartitionValues, Object record)
     throws WorkerException {
     RecordIdentifier newRecordIdentifier = extractRecordIdentifier(operationType, newPartitionValues, record);
-    int newBucketId = newRecordIdentifier.getBucketId();
+    int newBucketId = newRecordIdentifier.getBucketProperty();
 
     if (newPartitionValues == null) {
       newPartitionValues = Collections.emptyList();
@@ -209,8 +210,10 @@ public class MutatorCoordinator implements Closeable, Flushable {
   private RecordIdentifier extractRecordIdentifier(OperationType operationType, List<String> newPartitionValues,
       Object record) throws BucketIdException {
     RecordIdentifier recordIdentifier = recordInspector.extractRecordIdentifier(record);
+    int bucketIdFromRecord = BucketCodec.determineVersion(
+      recordIdentifier.getBucketProperty()).decodeWriterId(recordIdentifier.getBucketProperty());
     int computedBucketId = bucketIdResolver.computeBucketId(record);
-    if (operationType != OperationType.DELETE && recordIdentifier.getBucketId() != computedBucketId) {
+    if (operationType != OperationType.DELETE && bucketIdFromRecord != computedBucketId) {
       throw new BucketIdException("RecordIdentifier.bucketId != computed bucketId (" + computedBucketId
           + ") for record " + recordIdentifier + " in partition " + newPartitionValues + ".");
     }
index 8998de9..05cf8b7 100644 (file)
@@ -22,6 +22,8 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.io.RecordUpdater;
 import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -31,20 +33,24 @@ public class MutatorImpl implements Mutator {
 
   private final long transactionId;
   private final Path partitionPath;
-  private final int bucketId;
+  private final int bucketProperty;
   private final Configuration configuration;
   private final int recordIdColumn;
   private final ObjectInspector objectInspector;
   private RecordUpdater updater;
 
+  /**
+   * @param bucketProperty - from existing {@link RecordIdentifier#getBucketProperty()}
+   * @throws IOException
+   */
   public MutatorImpl(Configuration configuration, int recordIdColumn, ObjectInspector objectInspector,
-      AcidOutputFormat<?, ?> outputFormat, long transactionId, Path partitionPath, int bucketId) throws IOException {
+      AcidOutputFormat<?, ?> outputFormat, long transactionId, Path partitionPath, int bucketProperty) throws IOException {
     this.configuration = configuration;
     this.recordIdColumn = recordIdColumn;
     this.objectInspector = objectInspector;
     this.transactionId = transactionId;
     this.partitionPath = partitionPath;
-    this.bucketId = bucketId;
+    this.bucketProperty = bucketProperty;
 
     updater = createRecordUpdater(outputFormat);
   }
@@ -84,10 +90,12 @@ public class MutatorImpl implements Mutator {
   @Override
   public String toString() {
     return "ObjectInspectorMutator [transactionId=" + transactionId + ", partitionPath=" + partitionPath
-        + ", bucketId=" + bucketId + "]";
+        + ", bucketId=" + bucketProperty + "]";
   }
 
   protected RecordUpdater createRecordUpdater(AcidOutputFormat<?, ?> outputFormat) throws IOException {
+    int bucketId = BucketCodec
+      .determineVersion(bucketProperty).decodeWriterId(bucketProperty); 
     return outputFormat.getRecordUpdater(
         partitionPath,
         new AcidOutputFormat.Options(configuration)
index 6867679..de41d34 100644 (file)
@@ -33,7 +33,6 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat.AcidRecordReader;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.IOConstants;
@@ -148,7 +147,7 @@ public class StreamingAssert {
     while (recordReader.next(key, value)) {
       RecordIdentifier recordIdentifier = recordReader.getRecordIdentifier();
       Record record = new Record(new RecordIdentifier(recordIdentifier.getTransactionId(),
-          recordIdentifier.getBucketId(), recordIdentifier.getRowId()), value.toString());
+          recordIdentifier.getBucketProperty(), recordIdentifier.getRowId()), value.toString());
       System.out.println(record);
       records.add(record);
     }
index f1de1df..ab9f313 100644 (file)
@@ -33,6 +33,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hive.hcatalog.streaming.TestStreaming;
 import org.apache.hive.hcatalog.streaming.mutate.StreamingAssert.Factory;
@@ -101,6 +103,10 @@ public class TestMutations {
         .addColumn("msg", "string")
         .bucketCols(Collections.singletonList("string"));
   }
+  private static int encodeBucket(int bucketId) {
+    return BucketCodec.V1.encode(
+      new AcidOutputFormat.Options(null).bucket(bucketId));
+  }
 
   @Test
   public void testTransactionBatchEmptyCommitPartitioned() throws Exception {
@@ -242,7 +248,8 @@ public class TestMutations {
     List<Record> readRecords = streamingAssertions.readRecords();
     assertThat(readRecords.size(), is(1));
     assertThat(readRecords.get(0).getRow(), is("{1, Hello streaming}"));
-    assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L)));
+    assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L,
+      encodeBucket(0), 0L)));
 
     assertThat(transaction.getState(), is(COMMITTED));
     client.close();
@@ -299,7 +306,8 @@ public class TestMutations {
     List<Record> readRecords = streamingAssertions.readRecords();
     assertThat(readRecords.size(), is(1));
     assertThat(readRecords.get(0).getRow(), is("{1, Hello streaming}"));
-    assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L)));
+    assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L,
+      encodeBucket(0), 0L)));
 
     // EUROPE_UK
     streamingAssertions = assertionFactory.newStreamingAssert(table, EUROPE_UK);
@@ -310,7 +318,8 @@ public class TestMutations {
     readRecords = streamingAssertions.readRecords();
     assertThat(readRecords.size(), is(1));
     assertThat(readRecords.get(0).getRow(), is("{2, Hello streaming}"));
-    assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L)));
+    assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L,
+      encodeBucket(0), 0L)));
 
     // EUROPE_FRANCE
     streamingAssertions = assertionFactory.newStreamingAssert(table, EUROPE_FRANCE);
@@ -321,9 +330,11 @@ public class TestMutations {
     readRecords = streamingAssertions.readRecords();
     assertThat(readRecords.size(), is(2));
     assertThat(readRecords.get(0).getRow(), is("{3, Hello streaming}"));
-    assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L)));
+    assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L,
+      encodeBucket(0), 0L)));
     assertThat(readRecords.get(1).getRow(), is("{4, Bonjour streaming}"));
-    assertThat(readRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 1L)));
+    assertThat(readRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(1L,
+      encodeBucket(0), 1L)));
 
     client.close();
   }
@@ -369,7 +380,8 @@ public class TestMutations {
     List<Record> readRecords = streamingAssertions.readRecords();
     assertThat(readRecords.size(), is(1));
     assertThat(readRecords.get(0).getRow(), is("{1, Hello streaming}"));
-    assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L)));
+    assertThat(readRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L,
+      encodeBucket(0), 0L)));
 
     assertThat(transaction.getState(), is(COMMITTED));
     client.close();
@@ -499,13 +511,15 @@ public class TestMutations {
         "Namaste streaming 3"));
 
     mutateCoordinator.update(ASIA_INDIA, new MutableRecord(2, "UPDATED: Namaste streaming 2", new RecordIdentifier(1L,
-        0, 1L)));
+      encodeBucket(0), 1L)));
     mutateCoordinator.insert(ASIA_INDIA, asiaIndiaRecord3);
-    mutateCoordinator.delete(EUROPE_UK, new MutableRecord(3, "Hello streaming 1", new RecordIdentifier(1L, 0, 0L)));
+    mutateCoordinator.delete(EUROPE_UK, new MutableRecord(3, "Hello streaming 1", new RecordIdentifier(1L,
+      encodeBucket(0), 0L)));
     mutateCoordinator.delete(EUROPE_FRANCE,
-        new MutableRecord(5, "Bonjour streaming 1", new RecordIdentifier(1L, 0, 0L)));
+        new MutableRecord(5, "Bonjour streaming 1", new RecordIdentifier(1L,
+          encodeBucket(0), 0L)));
     mutateCoordinator.update(EUROPE_FRANCE, new MutableRecord(6, "UPDATED: Bonjour streaming 2", new RecordIdentifier(
-        1L, 0, 1L)));
+        1L, encodeBucket(0), 1L)));
     mutateCoordinator.close();
 
     mutateTransaction.commit();
@@ -518,11 +532,14 @@ public class TestMutations {
     List<Record> indiaRecords = indiaAssertions.readRecords();
     assertThat(indiaRecords.size(), is(3));
     assertThat(indiaRecords.get(0).getRow(), is("{1, Namaste streaming 1}"));
-    assertThat(indiaRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 0L)));
+    assertThat(indiaRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L,
+      encodeBucket(0), 0L)));
     assertThat(indiaRecords.get(1).getRow(), is("{2, UPDATED: Namaste streaming 2}"));
-    assertThat(indiaRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 1L)));
+    assertThat(indiaRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(1L,
+      encodeBucket(0), 1L)));
     assertThat(indiaRecords.get(2).getRow(), is("{20, Namaste streaming 3}"));
-    assertThat(indiaRecords.get(2).getRecordIdentifier(), is(new RecordIdentifier(2L, 0, 0L)));
+    assertThat(indiaRecords.get(2).getRecordIdentifier(), is(new RecordIdentifier(2L,
+      encodeBucket(0), 0L)));
 
     StreamingAssert ukAssertions = assertionFactory.newStreamingAssert(table, EUROPE_UK);
     ukAssertions.assertMinTransactionId(1L);
@@ -530,7 +547,8 @@ public class TestMutations {
     List<Record> ukRecords = ukAssertions.readRecords();
     assertThat(ukRecords.size(), is(1));
     assertThat(ukRecords.get(0).getRow(), is("{4, Hello streaming 2}"));
-    assertThat(ukRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 1L)));
+    assertThat(ukRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L,
+      encodeBucket(0), 1L)));
 
     StreamingAssert franceAssertions = assertionFactory.newStreamingAssert(table, EUROPE_FRANCE);
     franceAssertions.assertMinTransactionId(1L);
@@ -538,7 +556,8 @@ public class TestMutations {
     List<Record> franceRecords = franceAssertions.readRecords();
     assertThat(franceRecords.size(), is(1));
     assertThat(franceRecords.get(0).getRow(), is("{6, UPDATED: Bonjour streaming 2}"));
-    assertThat(franceRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, 0, 1L)));
+    assertThat(franceRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L,
+      encodeBucket(0), 1L)));
 
     client.close();
   }
index 437946b..03c28a3 100644 (file)
@@ -20,6 +20,8 @@ package org.apache.hive.hcatalog.streaming.mutate.worker;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hive.hcatalog.streaming.mutate.MutableRecord;
@@ -40,7 +42,9 @@ public class TestBucketIdResolverImpl {
   public void testAttachBucketIdToRecord() {
     MutableRecord record = new MutableRecord(1, "hello");
     capturingBucketIdResolver.attachBucketIdToRecord(record);
-    assertThat(record.rowId, is(new RecordIdentifier(-1L, 1, -1L)));
+    assertThat(record.rowId, is(new RecordIdentifier(-1L, 
+      BucketCodec.V1.encode(new AcidOutputFormat.Options(null).bucket(1)),
+      -1L)));
     assertThat(record.id, is(1));
     assertThat(record.msg.toString(), is("hello"));
   }
index 9aeeb31..2273e06 100644 (file)
@@ -75,7 +75,7 @@ public class TestMutatorImpl {
   public void testCreatesRecordReader() throws IOException {
     verify(mockOutputFormat).getRecordUpdater(eq(PATH), captureOptions.capture());
     Options options = captureOptions.getValue();
-    assertThat(options.getBucket(), is(BUCKET_ID));
+    assertThat(options.getBucketId(), is(BUCKET_ID));
     assertThat(options.getConfiguration(), is((Configuration) configuration));
     assertThat(options.getInspector(), is(mockObjectInspector));
     assertThat(options.getRecordIdColumn(), is(RECORD_ID_COLUMN));
index 3e09432..4d46d65 100644 (file)
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
@@ -75,7 +76,6 @@ import org.slf4j.LoggerFactory;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.Serializable;
-import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -768,8 +768,10 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
         // Find the bucket id, and switch buckets if need to
         ObjectInspector rowInspector = bDynParts ? subSetOI : outputObjInspector;
         Object recId = ((StructObjectInspector)rowInspector).getStructFieldData(row, recIdField);
-        int bucketNum =
+        int bucketProperty =
             bucketInspector.get(recIdInspector.getStructFieldData(recId, bucketField));
+        int bucketNum = 
+          BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty);
         if (fpaths.acidLastBucket != bucketNum) {
           fpaths.acidLastBucket = bucketNum;
           // Switch files
index 405cfde..a614bde 100644 (file)
@@ -51,7 +51,7 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
     private Reporter reporter;
     private long minimumTransactionId;
     private long maximumTransactionId;
-    private int bucket;
+    private int bucketId;
     /**
      * Based on {@link org.apache.hadoop.hive.ql.metadata.Hive#mvFile(HiveConf, FileSystem, Path, FileSystem, Path, boolean, boolean)}
      * _copy_N starts with 1.
@@ -176,12 +176,12 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
     }
 
     /**
-     * The bucket that is included in this file.
-     * @param bucket the bucket number
+     * The bucketId that is included in this file.
+     * @param bucket the bucketId number
      * @return this
      */
     public Options bucket(int bucket) {
-      this.bucket = bucket;
+      this.bucketId = bucket;
       return this;
     }
 
@@ -293,8 +293,8 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
       return writingDeleteDelta;
     }
 
-    public int getBucket() {
-      return bucket;
+    public int getBucketId() {
+      return bucketId;
     }
 
     public int getRecordIdColumn() {
index 1c03736..1e33424 100644 (file)
@@ -196,7 +196,7 @@ public class AcidUtils {
     String subdir;
     if (options.getOldStyle()) {
       return new Path(directory, String.format(LEGACY_FILE_BUCKET_DIGITS,
-          options.getBucket()) + "_0");
+          options.getBucketId()) + "_0");
     } else if (options.isWritingBase()) {
       subdir = BASE_PREFIX + String.format(DELTA_DIGITS,
           options.getMaximumTransactionId());
@@ -217,7 +217,7 @@ public class AcidUtils {
                         options.getMaximumTransactionId(),
                         options.getStatementId());
     }
-    return createBucketFile(new Path(directory, subdir), options.getBucket());
+    return createBucketFile(new Path(directory, subdir), options.getBucketId());
   }
 
   /**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/BucketCodec.java b/ql/src/java/org/apache/hadoop/hive/ql/io/BucketCodec.java
new file mode 100644 (file)
index 0000000..d1c2898
--- /dev/null
@@ -0,0 +1,111 @@
+package org.apache.hadoop.hive.ql.io;
+
+/**
+ * This class makes sense of {@link RecordIdentifier#getBucketProperty()}.  Up until ASF Hive 3.0 this
+ * field was simply the bucket ID.  Since 3.0 it does bit packing to store several things:
+ * top 3 bits - version describing the format (we can only have 8).
+ * The rest is version specific - see below.
+ */
+public enum BucketCodec {
+  /**
+   * This is the "legacy" version.  The whole {@code bucket} value just has the bucket ID in it.
+   * The numeric code for this version is 0. (Assumes bucket ID takes less than 29 bits... which
+   * implies top 3 bits are 000 so data written before Hive 3.0 is readable with this scheme).
+   */
+  V0(0) {
+    @Override
+    public int decodeWriterId(int bucketProperty) {
+      return bucketProperty;
+    }
+    @Override
+    public int decodeStatementId(int bucketProperty) {
+      return 0;
+    }
+    @Override
+    public int encode(AcidOutputFormat.Options options) {
+      return options.getBucketId();
+    }
+  },
+  /**
+   * Represents format of "bucket" property in Hive 3.0.
+   * top 3 bits - version code.
+   * next 1 bit - reserved for future
+   * next 12 bits - the bucket ID
+   * next 4 bits reserved for future
+   * remaining 12 bits - the statement ID - 0-based numbering of all statements within a
+   * transaction.  Each leg of a multi-insert statement gets a separate statement ID.
+   * The reserved bits align it so that it easier to interpret it in Hex.
+   * 
+   * Constructs like Merge and Multi-Insert may have multiple tasks writing data that belongs to
+   * the same physical bucket file.  For example, a Merge stmt with update and insert clauses,
+   * (and split update enabled - should be the default in 3.0).  A task on behalf of insert may
+   * be writing a row into bucket 0 and another task in the update branch may be writing an insert
+   * event into bucket 0.  Each of these task are writing to different delta directory - distinguished
+   * by statement ID.  By including both bucket ID and statement ID in {@link RecordIdentifier}
+   * we ensure that {@link RecordIdentifier} is unique.
+   * 
+   * The intent is that sorting rows by {@link RecordIdentifier} groups rows in the same physical
+   * bucket next to each other.
+   * For any row created by a given version of Hive, top 3 bits are constant.  The next
+   * most significant bits are the bucket ID, then the statement ID.  This ensures that
+   * {@link org.apache.hadoop.hive.ql.optimizer.SortedDynPartitionOptimizer} works which is
+   * designed so that each task only needs to keep 1 writer opened at a time.  It could be
+   * configured such that a single writer sees data for multiple buckets so it must "group" data
+   * by bucket ID (and then sort within each bucket as required) which is achieved via sorting
+   * by {@link RecordIdentifier} which includes the {@link RecordIdentifier#getBucketProperty()}
+   * which has the actual bucket ID in the high order bits.  This scheme also ensures that 
+   * {@link org.apache.hadoop.hive.ql.exec.FileSinkOperator#process(Object, int)} works in case
+   * there numBuckets > numReducers.  (The later could be fixed by changing how writers are
+   * initialized in "if (fpaths.acidLastBucket != bucketNum) {")
+   */
+  V1(1) {
+    @Override
+    public int decodeWriterId(int bucketProperty) {
+      return (bucketProperty & 0b0000_1111_1111_1111_0000_0000_0000_0000) >>> 16;
+    }
+    @Override
+    public int decodeStatementId(int bucketProperty) {
+      return (bucketProperty & 0b0000_0000_0000_0000_0000_1111_1111_1111);
+    }
+    @Override
+    public int encode(AcidOutputFormat.Options options) {
+      return this.version << 29 | options.getBucketId() << 16 |
+        (options.getStatementId() >= 0 ? options.getStatementId() : 0);
+    }
+  };
+  private static int TOP3BITS_MASK = 0b1110_0000_0000_0000_0000_0000_0000_0000;
+  public static BucketCodec determineVersion(int bucket) {
+    assert 7 << 29 == BucketCodec.TOP3BITS_MASK;
+    //look at top 3 bits and return appropriate enum
+    try {
+      return getCodec((BucketCodec.TOP3BITS_MASK & bucket) >>> 29);
+    }
+    catch(IllegalArgumentException ex) {
+      throw new IllegalArgumentException(ex.getMessage() + " Cannot decode version from " + bucket);
+    }
+  }
+  public static BucketCodec getCodec(int version) {
+    switch (version) {
+      case 0:
+        return BucketCodec.V0;
+      case 1:
+        return BucketCodec.V1;
+      default:
+        throw new IllegalArgumentException("Illegal 'bucket' format. Version=" + version);
+    }
+  }
+  final int version;
+  BucketCodec(int version) {
+    this.version = version;
+  }
+
+  /**
+   * For bucketed tables this the bucketId, otherwise writerId
+   */
+  public abstract int decodeWriterId(int bucketProperty);
+  public abstract int decodeStatementId(int bucketProperty);
+  public abstract int encode(AcidOutputFormat.Options options);
+  public int getVersion() {
+    return version;
+  }
+}
index 7f2c169..87635c2 100644 (file)
@@ -89,7 +89,7 @@ public class RecordIdentifier implements WritableComparable<RecordIdentifier> {
         return;
       }
       struct[Field.transactionId.ordinal()] = ri.getTransactionId();
-      struct[Field.bucketId.ordinal()] = ri.getBucketId();
+      struct[Field.bucketId.ordinal()] = ri.getBucketProperty();
       struct[Field.rowId.ordinal()] = ri.getRowId();
     }
   }
@@ -142,10 +142,10 @@ public class RecordIdentifier implements WritableComparable<RecordIdentifier> {
   }
 
   /**
-   * What was the original bucket id for the last row?
-   * @return the bucket id
+   * See {@link BucketCodec} for details
+   * @return the bucket value;
    */
-  public int getBucketId() {
+  public int getBucketProperty() {
     return bucketId;
   }
 
@@ -219,7 +219,16 @@ public class RecordIdentifier implements WritableComparable<RecordIdentifier> {
 
   @Override
   public String toString() {
-    return "{originalTxn: " + transactionId + ", bucket: " +
-        bucketId + ", row: " + getRowId() + "}";
+    BucketCodec codec = 
+      BucketCodec.determineVersion(bucketId);
+    String s = "(" + codec.getVersion() + "." + codec.decodeWriterId(bucketId) +
+      "." + codec.decodeStatementId(bucketId) + ")";
+    return "{originalTxn: " + transactionId + ", " + bucketToString() + ", row: " + getRowId() +"}";
+  }
+  protected String bucketToString() {
+    BucketCodec codec =
+      BucketCodec.determineVersion(bucketId);
+    return  "bucket: " + bucketId + "(" + codec.getVersion() + "." +
+      codec.decodeWriterId(bucketId) + "." + codec.decodeStatementId(bucketId) + ")";
   }
 }
index f9e17a9..de49fc8 100644 (file)
@@ -1953,10 +1953,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     if (split.hasBase()) {
       AcidOutputFormat.Options acidIOOptions =
         AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf);
-      if(acidIOOptions.getBucket() < 0) {
+      if(acidIOOptions.getBucketId() < 0) {
         LOG.warn("Can't determine bucket ID for " + split.getPath() + "; ignoring");
       }
-      bucket = acidIOOptions.getBucket();
+      bucket = acidIOOptions.getBucketId();
       if(split.isOriginal()) {
         mergerOptions.copyIndex(acidIOOptions.getCopyNumber()).bucketPath(split.getPath());
       }
@@ -2033,7 +2033,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       }
       AcidOutputFormat.Options bucketInfo =
         AcidUtils.parseBaseOrDeltaBucketFilename(stat.getPath(), fs.getConf());
-      if(bucketInfo.getBucket() == bucket) {
+      if(bucketInfo.getBucketId() == bucket) {
         return stat.getPath();
       }
     }
@@ -2211,7 +2211,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         AcidOutputFormat.Options opts = AcidUtils.parseBaseOrDeltaBucketFilename
             (child.getFileStatus().getPath(), context.conf);
         opts.writingBase(true);
-        int b = opts.getBucket();
+        int b = opts.getBucketId();
         // If the bucket is in the valid range, mark it as covered.
         // I wish Hive actually enforced bucketing all of the time.
         if (b >= 0 && b < covered.length) {
index ffcdf6a..650f2af 100644 (file)
@@ -80,6 +80,13 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
   @VisibleForTesting
   public final static class ReaderKey extends RecordIdentifier{
     private long currentTransactionId;
+    /**
+     * This is the value from delta file name which may be different from value encode in 
+     * {@link RecordIdentifier#getBucketProperty()} in case of Update/Delete.
+     * So for Acid 1.0 + multi-stmt txn, if {@code isSameRow() == true}, then it must be an update
+     * or delete event.  For Acid 2.0 + multi-stmt txn, it must be a delete event.
+     * No 2 Insert events from can ever agree on {@link RecordIdentifier}
+     */
     private int statementId;//sort on this descending, like currentTransactionId
 
     public ReaderKey() {
@@ -174,8 +181,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
 
     @Override
     public String toString() {
-      return "{originalTxn: " + getTransactionId() + ", bucket: " +
-          getBucketId() + ", row: " + getRowId() + ", currentTxn: " +
+      return "{originalTxn: " + getTransactionId() + ", " +
+          bucketToString() + ", row: " + getRowId() + ", currentTxn: " +
           currentTransactionId + ", statementId: "+ statementId + "}";
     }
   }
@@ -375,7 +382,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
             AcidOutputFormat.Options bucketOptions =
               AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf);
-            if (bucketOptions.getBucket() != bucket) {
+            if (bucketOptions.getBucketId() != bucket) {
               continue;
             }
             if(haveSeenCurrentFile) {
@@ -426,7 +433,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
             AcidOutputFormat.Options bucketOptions =
               AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf);
-            if (bucketOptions.getBucket() == bucket) {
+            if (bucketOptions.getBucketId() == bucket) {
               numFilesInBucket++;
               if(numFilesInBucket > 1) {
                 isLastFileForThisBucket = false;
@@ -540,7 +547,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
     private Reader advanceToNextFile() throws IOException {
       while(nextFileIndex < originalFiles.size()) {
         AcidOutputFormat.Options bucketOptions = AcidUtils.parseBaseOrDeltaBucketFilename(originalFiles.get(nextFileIndex).getFileStatus().getPath(), conf);
-        if (bucketOptions.getBucket() == bucket) {
+        if (bucketOptions.getBucketId() == bucket) {
           break;
         }
         nextFileIndex++;
index 65f4a24..d40b89a 100644 (file)
@@ -29,10 +29,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
@@ -48,10 +51,12 @@ import org.apache.orc.impl.OrcAcidUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * A RecordUpdater where the files are stored as ORC.
+ * A note on various record structures: the {@code row} coming in (as in {@link #insert(long, Object)}
+ * for example), is a struct like <RecordIdentifier, f1, ... fn> but what is written to the file
+ * * is <op, otid, writerId, rowid, ctid, <f1, ... fn>> (see {@link #createEventSchema(ObjectInspector)})
+ * So there are OIs here to make the translation.
  */
 public class OrcRecordUpdater implements RecordUpdater {
 
@@ -96,7 +101,6 @@ public class OrcRecordUpdater implements RecordUpdater {
   private final IntWritable bucket = new IntWritable();
   private final LongWritable rowId = new LongWritable();
   private long insertedRows = 0;
-  private long rowIdOffset = 0;
   // This records how many rows have been inserted or deleted.  It is separate from insertedRows
   // because that is monotonically increasing to give new unique row ids.
   private long rowCountDelta = 0;
@@ -111,6 +115,7 @@ public class OrcRecordUpdater implements RecordUpdater {
   private LongObjectInspector rowIdInspector; // OI for the long row id inside the recordIdentifier
   private LongObjectInspector origTxnInspector; // OI for the original txn inside the record
   // identifer
+  private IntObjectInspector bucketInspector;
 
   static int getOperation(OrcStruct struct) {
     return ((IntWritable) struct.getFieldValue(OPERATION)).get();
@@ -200,7 +205,18 @@ public class OrcRecordUpdater implements RecordUpdater {
       this.acidOperationalProperties =
           AcidUtils.getAcidOperationalProperties(options.getConfiguration());
     }
-    this.bucket.set(options.getBucket());
+    BucketCodec bucketCodec = BucketCodec.V1;
+    if(options.getConfiguration() != null) {
+      //so that we can test "old" files
+      Configuration hc = options.getConfiguration();
+      if(hc.getBoolean(HiveConf.ConfVars.HIVE_IN_TEST.name(), false) ||
+        hc.getBoolean(HiveConf.ConfVars.HIVE_IN_TEZ_TEST.name(), false)) {
+        bucketCodec = BucketCodec.getCodec(
+          hc.getInt(HiveConf.ConfVars.TESTMODE_BUCKET_CODEC_VERSION.name(),
+            BucketCodec.V1.getVersion()));
+      }
+    }
+    this.bucket.set(bucketCodec.encode(options));
     this.path = AcidUtils.createFilename(path, options);
     this.deleteEventWriter = null;
     this.deleteEventPath = null;
@@ -283,41 +299,6 @@ public class OrcRecordUpdater implements RecordUpdater {
   public String toString() {
     return getClass().getName() + "[" + path +"]";
   }
-  /**
-   * To handle multiple INSERT... statements in a single transaction, we want to make sure
-   * to generate unique {@code rowId} for all inserted rows of the transaction.
-   * @return largest rowId created by previous statements (maybe 0)
-   * @throws IOException
-   */
-  private long findRowIdOffsetForInsert() throws IOException {
-    /*
-    * 1. need to know bucket we are writing to
-    * 2. need to know which delta dir it's in
-    * Then,
-    * 1. find the same bucket file in previous (insert) delta dir for this txn
-    *    (Note: in case of split_update, we can ignore the delete_delta dirs)
-    * 2. read the footer and get AcidStats which has insert count
-     * 2.1 if AcidStats.inserts>0 add to the insert count.
-     *  else go to previous delta file
-     *  For example, consider insert/update/insert case...*/
-    if(options.getStatementId() <= 0) {
-      return 0;//there is only 1 statement in this transaction (so far)
-    }
-    long totalInserts = 0;
-    for(int pastStmt = options.getStatementId() - 1; pastStmt >= 0; pastStmt--) {
-      Path matchingBucket = AcidUtils.createFilename(options.getFinalDestination(), options.clone().statementId(pastStmt));
-      if(!fs.exists(matchingBucket)) {
-        continue;
-      }
-      Reader reader = OrcFile.createReader(matchingBucket, OrcFile.readerOptions(options.getConfiguration()));
-      //no close() on Reader?!
-      AcidStats acidStats = OrcAcidUtils.parseAcidStats(reader);
-      if(acidStats.inserts > 0) {
-        totalInserts += acidStats.inserts;
-      }
-    }
-    return totalInserts;
-  }
   // Find the record identifier column (if there) and return a possibly new ObjectInspector that
   // will strain out the record id for the underlying writer.
   private ObjectInspector findRecId(ObjectInspector inspector, int rowIdColNum) {
@@ -338,6 +319,7 @@ public class OrcRecordUpdater implements RecordUpdater {
       originalTxnField = fields.get(0);
       origTxnInspector = (LongObjectInspector)originalTxnField.getFieldObjectInspector();
       bucketField = fields.get(1);
+      bucketInspector = (IntObjectInspector) bucketField.getFieldObjectInspector();
       rowIdField = fields.get(2);
       rowIdInspector = (LongObjectInspector)rowIdField.getFieldObjectInspector();
 
@@ -346,11 +328,11 @@ public class OrcRecordUpdater implements RecordUpdater {
       return newInspector;
     }
   }
-
   private void addSimpleEvent(int operation, long currentTransaction, long rowId, Object row)
       throws IOException {
     this.operation.set(operation);
     this.currentTransaction.set(currentTransaction);
+    Integer currentBucket = null;
     // If this is an insert, originalTransaction should be set to this transaction.  If not,
     // it will be reset by the following if anyway.
     long originalTransaction = currentTransaction;
@@ -359,9 +341,8 @@ public class OrcRecordUpdater implements RecordUpdater {
       originalTransaction = origTxnInspector.get(
           recIdInspector.getStructFieldData(rowIdValue, originalTxnField));
       rowId = rowIdInspector.get(recIdInspector.getStructFieldData(rowIdValue, rowIdField));
-    }
-    else if(operation == INSERT_OPERATION) {
-      rowId += rowIdOffset;
+      currentBucket = setBucket(bucketInspector.get(
+        recIdInspector.getStructFieldData(rowIdValue, bucketField)), operation);
     }
     this.rowId.set(rowId);
     this.originalTransaction.set(originalTransaction);
@@ -372,6 +353,7 @@ public class OrcRecordUpdater implements RecordUpdater {
       writer = OrcFile.createWriter(path, writerOptions);
     }
     writer.addRow(item);
+    restoreBucket(currentBucket, operation);
   }
 
   private void addSplitUpdateEvent(int operation, long currentTransaction, long rowId, Object row)
@@ -388,8 +370,11 @@ public class OrcRecordUpdater implements RecordUpdater {
             recIdInspector.getStructFieldData(rowValue, originalTxnField));
     rowId = rowIdInspector.get(
             recIdInspector.getStructFieldData(rowValue, rowIdField));
+    Integer currentBucket = null;
 
     if (operation == DELETE_OPERATION || operation == UPDATE_OPERATION) {
+      currentBucket = setBucket(bucketInspector.get(
+        recIdInspector.getStructFieldData(rowValue, bucketField)), operation);
       // Initialize a deleteEventWriter if not yet done. (Lazy initialization)
       if (deleteEventWriter == null) {
         // Initialize an indexBuilder for deleteEvents.
@@ -414,6 +399,7 @@ public class OrcRecordUpdater implements RecordUpdater {
       item.setFieldValue(OrcRecordUpdater.ROW, null); // ROW is null for delete events.
       deleteEventIndexBuilder.addKey(DELETE_OPERATION, originalTransaction, bucket.get(), rowId);
       deleteEventWriter.addRow(item);
+      restoreBucket(currentBucket, operation);
     }
 
     if (operation == UPDATE_OPERATION) {
@@ -426,9 +412,6 @@ public class OrcRecordUpdater implements RecordUpdater {
   public void insert(long currentTransaction, Object row) throws IOException {
     if (this.currentTransaction.get() != currentTransaction) {
       insertedRows = 0;
-      //this method is almost no-op in hcatalog.streaming case since statementId == 0 is
-      //always true in that case
-      rowIdOffset = findRowIdOffsetForInsert();
     }
     if (acidOperationalProperties.isSplitUpdate()) {
       addSplitUpdateEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row);
@@ -442,7 +425,6 @@ public class OrcRecordUpdater implements RecordUpdater {
   public void update(long currentTransaction, Object row) throws IOException {
     if (this.currentTransaction.get() != currentTransaction) {
       insertedRows = 0;
-      rowIdOffset = findRowIdOffsetForInsert();
     }
     if (acidOperationalProperties.isSplitUpdate()) {
       addSplitUpdateEvent(UPDATE_OPERATION, currentTransaction, -1L, row);
@@ -664,4 +646,15 @@ public class OrcRecordUpdater implements RecordUpdater {
       return recId;
     }
   }
+  private void restoreBucket(Integer currentBucket, int operation) {
+    if(currentBucket != null) {
+      setBucket(currentBucket, operation);
+    }
+  }
+  private int setBucket(int bucketProperty, int operation) {
+    assert operation == UPDATE_OPERATION || operation == DELETE_OPERATION;
+    int currentBucketProperty = bucket.get();
+    bucket.set(bucketProperty);
+    return currentBucketProperty;
+  }
 }
index 29f5a8e..8f80710 100644 (file)
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.InputSplit;
@@ -353,7 +354,7 @@ public class VectorizedOrcAcidRowBatchReader
       throws IOException {
         final Path[] deleteDeltas = getDeleteDeltaDirsFromSplit(orcSplit);
         if (deleteDeltas.length > 0) {
-          int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket();
+          int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucketId();
           String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
           this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString);
           OrcRawRecordMerger.Options mergerOptions = new OrcRawRecordMerger.Options().isCompacting(false);
@@ -470,9 +471,9 @@ public class VectorizedOrcAcidRowBatchReader
    * An implementation for DeleteEventRegistry that optimizes for performance by loading
    * all the delete events into memory at once from all the delete delta files.
    * It starts by reading all the delete events through a regular sort merge logic
-   * into two vectors- one for original transaction id (otid), and the other for row id.
-   * (In the current version, since the bucket id should be same for all the delete deltas,
-   * it is not stored). The otids are likely to be repeated very often, as a single transaction
+   * into 3 vectors- one for original transaction id (otid), one for bucket property and one for
+   * row id.  See {@link BucketCodec} for more about bucket property.
+   * The otids are likely to be repeated very often, as a single transaction
    * often deletes thousands of rows. Hence, the otid vector is compressed to only store the
    * toIndex and fromIndex ranges in the larger row id vector. Now, querying whether a
    * record id is deleted or not, is done by performing a binary search on the
@@ -483,21 +484,22 @@ public class VectorizedOrcAcidRowBatchReader
    */
    static class ColumnizedDeleteEventRegistry implements DeleteEventRegistry {
     /**
-     * A simple wrapper class to hold the (otid, rowId) pair.
+     * A simple wrapper class to hold the (otid, bucketProperty, rowId) pair.
      */
     static class DeleteRecordKey implements Comparable<DeleteRecordKey> {
       private long originalTransactionId;
+      /**
+       * see {@link BucketCodec}
+       */
+      private int bucketProperty; 
       private long rowId;
       public DeleteRecordKey() {
         this.originalTransactionId = -1;
         this.rowId = -1;
       }
-      public DeleteRecordKey(long otid, long rowId) {
-        this.originalTransactionId = otid;
-        this.rowId = rowId;
-      }
-      public void set(long otid, long rowId) {
+      public void set(long otid, int bucketProperty, long rowId) {
         this.originalTransactionId = otid;
+        this.bucketProperty = bucketProperty;
         this.rowId = rowId;
       }
 
@@ -509,11 +511,18 @@ public class VectorizedOrcAcidRowBatchReader
         if (originalTransactionId != other.originalTransactionId) {
           return originalTransactionId < other.originalTransactionId ? -1 : 1;
         }
+        if(bucketProperty != other.bucketProperty) {
+          return bucketProperty < other.bucketProperty ? -1 : 1;
+        }
         if (rowId != other.rowId) {
           return rowId < other.rowId ? -1 : 1;
         }
         return 0;
       }
+      @Override
+      public String toString() {
+        return "otid: " + originalTransactionId + " bucketP:" + bucketProperty + " rowid: " + rowId;
+      }
     }
 
     /**
@@ -528,6 +537,7 @@ public class VectorizedOrcAcidRowBatchReader
       private int indexPtrInBatch;
       private final int bucketForSplit; // The bucket value should be same for all the records.
       private final ValidTxnList validTxnList;
+      private boolean isBucketPropertyRepeating;
 
       public DeleteReaderValue(Reader deleteDeltaReader, Reader.Options readerOptions, int bucket,
           ValidTxnList validTxnList) throws IOException {
@@ -539,6 +549,7 @@ public class VectorizedOrcAcidRowBatchReader
         }
         this.indexPtrInBatch = 0;
         this.validTxnList = validTxnList;
+        checkBucketId();//check 1st batch
       }
 
       public boolean next(DeleteRecordKey deleteRecordKey) throws IOException {
@@ -550,37 +561,19 @@ public class VectorizedOrcAcidRowBatchReader
           if (indexPtrInBatch >= batch.size) {
             // We have exhausted our current batch, read the next batch.
             if (recordReader.nextBatch(batch)) {
-              // Whenever we are reading a batch, we must ensure that all the records in the batch
-              // have the same bucket id as the bucket id of the split. If not, throw exception.
-              // NOTE: this assertion might not hold, once virtual bucketing is in place. However,
-              // it should be simple to fix that case. Just replace check for bucket equality with
-              // a check for valid bucket mapping. Until virtual bucketing is added, it means
-              // either the split computation got messed up or we found some corrupted records.
-              long bucketForRecord = ((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector[0];
-              if ((batch.size > 1 && !batch.cols[OrcRecordUpdater.BUCKET].isRepeating)
-                  || (bucketForRecord != bucketForSplit)){
-                throw new IOException("Corrupted records with different bucket ids "
-                    + "from the containing bucket file found! Expected bucket id "
-                    + bucketForSplit + ", however found the bucket id " + bucketForRecord);
-              }
+              checkBucketId();
               indexPtrInBatch = 0; // After reading the batch, reset the pointer to beginning.
             } else {
               return false; // no more batches to read, exhausted the reader.
             }
           }
-          int originalTransactionIndex =
-              batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? 0 : indexPtrInBatch;
-          long originalTransaction =
-              ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[originalTransactionIndex];
-          long rowId = ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector[indexPtrInBatch];
-          int currentTransactionIndex =
-              batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating ? 0 : indexPtrInBatch;
-          long currentTransaction =
-              ((LongColumnVector) batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[currentTransactionIndex];
+          long currentTransaction = setCurrentDeleteKey(deleteRecordKey);
+          if(!isBucketPropertyRepeating) {
+            checkBucketId(deleteRecordKey.bucketProperty);
+          }
           ++indexPtrInBatch;
           if (validTxnList.isTxnValid(currentTransaction)) {
             isValidNext = true;
-            deleteRecordKey.set(originalTransaction, rowId);
           }
         }
         return true;
@@ -589,8 +582,51 @@ public class VectorizedOrcAcidRowBatchReader
       public void close() throws IOException {
         this.recordReader.close();
       }
+      private long setCurrentDeleteKey(DeleteRecordKey deleteRecordKey) {
+        int originalTransactionIndex =
+          batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? 0 : indexPtrInBatch;
+        long originalTransaction =
+          ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[originalTransactionIndex];
+        int bucketPropertyIndex =
+          batch.cols[OrcRecordUpdater.BUCKET].isRepeating ? 0 : indexPtrInBatch;
+        int bucketProperty = (int)((LongColumnVector)batch.cols[OrcRecordUpdater.BUCKET]).vector[bucketPropertyIndex];
+        long rowId = ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector[indexPtrInBatch];
+        int currentTransactionIndex =
+          batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating ? 0 : indexPtrInBatch;
+        long currentTransaction =
+          ((LongColumnVector) batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[currentTransactionIndex];
+        deleteRecordKey.set(originalTransaction, bucketProperty, rowId);
+        return currentTransaction;
+      }
+      private void checkBucketId() throws IOException {
+        isBucketPropertyRepeating = batch.cols[OrcRecordUpdater.BUCKET].isRepeating;
+        if(isBucketPropertyRepeating) {
+          int bucketPropertyFromRecord = (int)((LongColumnVector)
+            batch.cols[OrcRecordUpdater.BUCKET]).vector[0];
+          checkBucketId(bucketPropertyFromRecord);
+        }
+      }
+      /**
+       * Whenever we are reading a batch, we must ensure that all the records in the batch
+       * have the same bucket id as the bucket id of the split. If not, throw exception.
+       * NOTE: this assertion might not hold, once virtual bucketing is in place. However,
+       * it should be simple to fix that case. Just replace check for bucket equality with
+       * a check for valid bucket mapping. Until virtual bucketing is added, it means
+       * either the split computation got messed up or we found some corrupted records.
+       */
+      private void checkBucketId(int bucketPropertyFromRecord) throws IOException {
+        int bucketIdFromRecord = BucketCodec.determineVersion(bucketPropertyFromRecord)
+          .decodeWriterId(bucketPropertyFromRecord);
+        if(bucketIdFromRecord != bucketForSplit) {
+          DeleteRecordKey dummy = new DeleteRecordKey();
+          long curTxnId = setCurrentDeleteKey(dummy);
+          throw new IOException("Corrupted records with different bucket ids "
+            + "from the containing bucket file found! Expected bucket id "
+            + bucketForSplit + ", however found the bucket id " + bucketIdFromRecord +
+            " from " + dummy + " curTxnId: " + curTxnId);
+        }
+      }
     }
-
     /**
      * A CompressedOtid class stores a compressed representation of the original
      * transaction ids (otids) read from the delete delta files. Since the record ids
@@ -599,13 +635,15 @@ public class VectorizedOrcAcidRowBatchReader
      * the toIndex. These fromIndex and toIndex reference the larger vector formed by
      * concatenating the correspondingly ordered rowIds.
      */
-    private class CompressedOtid implements Comparable<CompressedOtid> {
-      long originalTransactionId;
-      int fromIndex; // inclusive
-      int toIndex; // exclusive
+    private final class CompressedOtid implements Comparable<CompressedOtid> {
+      final long originalTransactionId;
+      final int bucketProperty;
+      final int fromIndex; // inclusive
+      final int toIndex; // exclusive
 
-      public CompressedOtid(long otid, int fromIndex, int toIndex) {
+      CompressedOtid(long otid, int bucketProperty, int fromIndex, int toIndex) {
         this.originalTransactionId = otid;
+        this.bucketProperty = bucketProperty;
         this.fromIndex = fromIndex;
         this.toIndex = toIndex;
       }
@@ -616,10 +654,24 @@ public class VectorizedOrcAcidRowBatchReader
         if (originalTransactionId != other.originalTransactionId) {
           return originalTransactionId < other.originalTransactionId ? -1 : 1;
         }
+        if(bucketProperty != other.bucketProperty) {
+          return bucketProperty < other.bucketProperty ? -1 : 1;
+        }
         return 0;
       }
     }
 
+    /**
+     * Food for thought:
+     * this is a bit problematic - in order to load ColumnizedDeleteEventRegistry we still open
+     * all delete deltas at once - possibly causing OOM same as for {@link SortMergedDeleteEventRegistry}
+     * which uses {@link OrcRawRecordMerger}.  Why not load all delete_delta sequentially.  Each
+     * dd is sorted by {@link RecordIdentifier} so we could create a BTree like structure where the
+     * 1st level is an array of originalTransactionId where each entry points at an array
+     * of bucketIds where each entry points at an array of rowIds.  We could probably use ArrayList
+     * to manage insertion as the structure is built (LinkedList?).  This should reduce memory
+     * footprint (as far as OrcReader to a single reader) - probably bad for LLAP IO
+     */
     private TreeMap<DeleteRecordKey, DeleteReaderValue> sortMerger;
     private long rowIds[];
     private CompressedOtid compressedOtids[];
@@ -627,7 +679,7 @@ public class VectorizedOrcAcidRowBatchReader
 
     public ColumnizedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit,
         Reader.Options readerOptions) throws IOException, DeleteEventsOverflowMemoryException {
-      int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucket();
+      int bucket = AcidUtils.parseBaseOrDeltaBucketFilename(orcSplit.getPath(), conf).getBucketId();
       String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
       this.validTxnList = (txnString == null) ? new ValidReadTxnList() : new ValidReadTxnList(txnString);
       this.sortMerger = new TreeMap<DeleteRecordKey, DeleteReaderValue>();
@@ -690,11 +742,22 @@ public class VectorizedOrcAcidRowBatchReader
       }
     }
 
+    /**
+     * This is not done quite right.  The intent of {@link CompressedOtid} is a hedge against
+     * "delete from T" that generates a huge number of delete events possibly even 2G - max array
+     * size.  (assuming no one txn inserts > 2G rows (in a bucket)).  As implemented, the algorithm
+     * first loads all data into one array otid[] and rowIds[] which defeats the purpose.
+     * In practice we should be filtering delete evens by min/max ROW_ID from the split.  The later
+     * is also not yet implemented: HIVE-16812.
+     */
     private void readAllDeleteEventsFromDeleteDeltas() throws IOException {
       if (sortMerger == null || sortMerger.isEmpty()) return; // trivial case, nothing to read.
       int distinctOtids = 0;
       long lastSeenOtid = -1;
+      int lastSeenBucketProperty = -1;
       long otids[] = new long[rowIds.length];
+      int[] bucketProperties = new int [rowIds.length];
+      
       int index = 0;
       while (!sortMerger.isEmpty()) {
         // The sortMerger is a heap data structure that stores a pair of
@@ -710,11 +773,14 @@ public class VectorizedOrcAcidRowBatchReader
         DeleteRecordKey deleteRecordKey = entry.getKey();
         DeleteReaderValue deleteReaderValue = entry.getValue();
         otids[index] = deleteRecordKey.originalTransactionId;
+        bucketProperties[index] = deleteRecordKey.bucketProperty;
         rowIds[index] = deleteRecordKey.rowId;
         ++index;
-        if (lastSeenOtid != deleteRecordKey.originalTransactionId) {
+        if (lastSeenOtid != deleteRecordKey.originalTransactionId ||
+          lastSeenBucketProperty != deleteRecordKey.bucketProperty) {
           ++distinctOtids;
           lastSeenOtid = deleteRecordKey.originalTransactionId;
+          lastSeenBucketProperty = deleteRecordKey.bucketProperty;
         }
         if (deleteReaderValue.next(deleteRecordKey)) {
           sortMerger.put(deleteRecordKey, deleteReaderValue);
@@ -728,20 +794,24 @@ public class VectorizedOrcAcidRowBatchReader
       // the fromIndex(inclusive) and toIndex(exclusive) for each unique otid.
       this.compressedOtids = new CompressedOtid[distinctOtids];
       lastSeenOtid = otids[0];
+      lastSeenBucketProperty = bucketProperties[0];
       int fromIndex = 0, pos = 0;
       for (int i = 1; i < otids.length; ++i) {
-        if (otids[i] != lastSeenOtid) {
-          compressedOtids[pos] = new CompressedOtid(lastSeenOtid, fromIndex, i);
+        if (otids[i] != lastSeenOtid || lastSeenBucketProperty != bucketProperties[i]) {
+          compressedOtids[pos] = 
+            new CompressedOtid(lastSeenOtid, lastSeenBucketProperty, fromIndex, i);
           lastSeenOtid = otids[i];
+          lastSeenBucketProperty = bucketProperties[i];
           fromIndex = i;
           ++pos;
         }
       }
       // account for the last distinct otid
-      compressedOtids[pos] = new CompressedOtid(lastSeenOtid, fromIndex, otids.length);
+      compressedOtids[pos] =
+        new CompressedOtid(lastSeenOtid, lastSeenBucketProperty, fromIndex, otids.length);
     }
 
-    private boolean isDeleted(long otid, long rowId) {
+    private boolean isDeleted(long otid, int bucketProperty, long rowId) {
       if (compressedOtids == null || rowIds == null) {
         return false;
       }
@@ -755,8 +825,8 @@ public class VectorizedOrcAcidRowBatchReader
           || otid > compressedOtids[compressedOtids.length - 1].originalTransactionId) {
         return false;
       }
-      // Create a dummy key for searching the otid in the compressed otid ranges.
-      CompressedOtid key = new CompressedOtid(otid, -1, -1);
+      // Create a dummy key for searching the otid/bucket in the compressed otid ranges.
+      CompressedOtid key = new CompressedOtid(otid, bucketProperty, -1, -1);
       int pos = Arrays.binarySearch(compressedOtids, key);
       if (pos >= 0) {
         // Otid with the given value found! Searching now for rowId...
@@ -788,6 +858,12 @@ public class VectorizedOrcAcidRowBatchReader
       long repeatedOriginalTransaction = (originalTransactionVector != null) ? -1
           : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0];
 
+      long[] bucketProperties =
+        batch.cols[OrcRecordUpdater.BUCKET].isRepeating ? null
+          : ((LongColumnVector)batch.cols[OrcRecordUpdater.BUCKET]).vector;
+      int repeatedBucketProperty = (bucketProperties != null) ? -1
+        : (int)((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector[0];
+
       long[] rowIdVector =
           ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector;
 
@@ -796,8 +872,10 @@ public class VectorizedOrcAcidRowBatchReader
           setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1)) {
         long otid = originalTransactionVector != null ? originalTransactionVector[setBitIndex]
                                                     : repeatedOriginalTransaction ;
+        int bucketProperty = bucketProperties != null ? (int)bucketProperties[setBitIndex]
+          : repeatedBucketProperty;
         long rowId = rowIdVector[setBitIndex];
-        if (isDeleted(otid, rowId)) {
+        if (isDeleted(otid, bucketProperty, rowId)) {
           selectedBitSet.clear(setBitIndex);
         }
      }
index 0541a40..78c511b 100644 (file)
@@ -696,15 +696,7 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
       if(numWhenMatchedUpdateClauses > 1) {
         throw new SemanticException(ErrorMsg.MERGE_TOO_MANY_UPDATE, ctx.getCmd());
       }
-      assert numInsertClauses < 2;
-      if(numInsertClauses == 1 && numWhenMatchedUpdateClauses == 1) {
-        if(AcidUtils.getAcidOperationalProperties(targetTable).isSplitUpdate()) {
-          throw new IllegalStateException("Tables with " +
-            hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES + "=" +
-            TransactionalValidationListener.DEFAULT_TRANSACTIONAL_PROPERTY + " currently do not " +
-            "support MERGE with both Insert and Update clauses.");
-        }
-      }
+      assert numInsertClauses < 2: "too many Insert clauses";
     }
     if(numWhenMatchedDeleteClauses + numWhenMatchedUpdateClauses == 2 && extraPredicate == null) {
       throw new SemanticException(ErrorMsg.MERGE_PREDIACTE_REQUIRED, ctx.getCmd());
index 461ef86..1de7604 100755 (executable)
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.CastDecimalToLong;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.gen.CastDoubleToLong;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.CastStringToLong;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.CastTimestampToLong;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -220,7 +221,9 @@ public class UDFToInteger extends UDF {
     if (i == null) {
       return null;
     } else {
-      intWritable.set(i.getBucketId());
+      BucketCodec decoder =
+        BucketCodec.determineVersion(i.getBucketProperty());
+      intWritable.set(decoder.decodeWriterId(i.getBucketProperty()));
       return intWritable;
     }
   }
index 5fb89d0..c531aeb 100644 (file)
@@ -493,53 +493,10 @@ public class TestTxnCommands {
    * sorts rows in dictionary order
    */
   private List<String> stringifyValues(int[][] rowsIn) {
-    assert rowsIn.length > 0;
-    int[][] rows = rowsIn.clone();
-    Arrays.sort(rows, new RowComp());
-    List<String> rs = new ArrayList<String>();
-    for(int[] row : rows) {
-      assert row.length > 0;
-      StringBuilder sb = new StringBuilder();
-      for(int value : row) {
-        sb.append(value).append("\t");
-      }
-      sb.setLength(sb.length() - 1);
-      rs.add(sb.toString());
-    }
-    return rs;
-  }
-  private static final class RowComp implements Comparator<int[]> {
-    @Override
-    public int compare(int[] row1, int[] row2) {
-      assert row1 != null && row2 != null && row1.length == row2.length;
-      for(int i = 0; i < row1.length; i++) {
-        int comp = Integer.compare(row1[i], row2[i]);
-        if(comp != 0) {
-          return comp;
-        }
-      }
-      return 0;
-    }
+    return TestTxnCommands2.stringifyValues(rowsIn);
   }
   private String makeValuesClause(int[][] rows) {
-    assert rows.length > 0;
-    StringBuilder sb = new StringBuilder("values");
-    for(int[] row : rows) {
-      assert row.length > 0;
-      if(row.length > 1) {
-        sb.append("(");
-      }
-      for(int value : row) {
-        sb.append(value).append(",");
-      }
-      sb.setLength(sb.length() - 1);//remove trailing comma
-      if(row.length > 1) {
-        sb.append(")");
-      }
-      sb.append(",");
-    }
-    sb.setLength(sb.length() - 1);//remove trailing comma
-    return sb.toString();
+    return TestTxnCommands2.makeValuesClause(rows);
   }
 
   private List<String> runStatementOnDriver(String stmt) throws Exception {
@@ -559,7 +516,6 @@ public class TestTxnCommands {
     throw new RuntimeException("Didn't get expected failure!");
   }
 
-//  @Ignore
   @Test
   public void exchangePartition() throws Exception {
     runStatementOnDriver("create database ex1");
@@ -757,9 +713,9 @@ public class TestTxnCommands {
     runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
     String query = "merge into " + Table.ACIDTBL +
       " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " +
-      "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " +
-      "WHEN MATCHED and t.a > 3 and t.a < 5 THEN DELETE " +
-      "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) ";
+      "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " + //updates (2,1) -> (2,0)
+      "WHEN MATCHED and t.a > 3 and t.a < 5 THEN DELETE " +//deletes (4,3)
+      "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) ";//inserts (11,11)
     runStatementOnDriver(query);
 
     List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
@@ -910,7 +866,7 @@ public class TestTxnCommands {
     Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/000001_0"));
     Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t5"));
     Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/000001_0_copy_1"));
-    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":1,\"rowid\":0}\t1\t17"));
+    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
     Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/000001_0_copy_1"));
     //run Compaction
     runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'");
@@ -927,7 +883,7 @@ public class TestTxnCommands {
     Assert.assertTrue(rs.get(1), rs.get(1).endsWith("nonacidorctbl/base_0000014/bucket_00001"));
     Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t5"));
     Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/base_0000014/bucket_00001"));
-    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":1,\"rowid\":0}\t1\t17"));
+    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":536936448,\"rowid\":0}\t1\t17"));
     Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/base_0000014/bucket_00001"));
 
     //make sure they are the same before and after compaction
index 31921f1..ed1a328 100644 (file)
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -87,19 +88,27 @@ public class TestTxnCommands2 {
   protected Driver d;
   protected static enum Table {
     ACIDTBL("acidTbl"),
-    ACIDTBLPART("acidTblPart"),
+    ACIDTBLPART("acidTblPart", "p"),
     NONACIDORCTBL("nonAcidOrcTbl"),
-    NONACIDPART("nonAcidPart"),
-    NONACIDPART2("nonAcidPart2"),
-    ACIDNESTEDPART("acidNestedPart");
+    NONACIDPART("nonAcidPart", "p"),
+    NONACIDPART2("nonAcidPart2", "p2"),
+    ACIDNESTEDPART("acidNestedPart", "p,q");
 
     private final String name;
+    private final String partitionColumns;
     @Override
     public String toString() {
       return name;
     }
+    String getPartitionColumns() {
+      return partitionColumns;
+    }
     Table(String name) {
+      this(name, null);
+    }
+    Table(String name, String partitionColumns) {
       this.name = name;
+      this.partitionColumns = partitionColumns;
     }
   }
 
@@ -353,14 +362,14 @@ public class TestTxnCommands2 {
      */
     String[][] expected = {
       {"{\"transactionid\":0,\"bucketid\":0,\"rowid\":0}\t0\t13",  "bucket_00000"},
-      {"{\"transactionid\":18,\"bucketid\":0,\"rowid\":0}\t0\t15", "bucket_00000"},
-      {"{\"transactionid\":20,\"bucketid\":0,\"rowid\":0}\t0\t17", "bucket_00000"},
+      {"{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":0}\t0\t15", "bucket_00000"},
+      {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t0\t17", "bucket_00000"},
       {"{\"transactionid\":0,\"bucketid\":0,\"rowid\":1}\t0\t120", "bucket_00000"},
       {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t2",   "bucket_00001"},
       {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":3}\t1\t4",   "bucket_00001"},
       {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":2}\t1\t5",   "bucket_00001"},
       {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":4}\t1\t6",   "bucket_00001"},
-      {"{\"transactionid\":18,\"bucketid\":1,\"rowid\":0}\t1\t16", "bucket_00001"}
+      {"{\"transactionid\":18,\"bucketid\":536936448,\"rowid\":0}\t1\t16", "bucket_00001"}
     };
     Assert.assertEquals("Unexpected row count before compaction", expected.length, rs.size());
     for(int i = 0; i < expected.length; i++) {
@@ -539,7 +548,7 @@ public class TestTxnCommands2 {
     List<String> rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
     int [][] resultData = new int[][] {{1, 2}};
     Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *?
     int resultCount = 1;
     Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
 
@@ -555,7 +564,7 @@ public class TestTxnCommands2 {
     rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
     resultData = new int[][] {{1, 2}};
     Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *?
     resultCount = 1;
     Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
 
@@ -748,7 +757,7 @@ public class TestTxnCommands2 {
     rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
     resultData = new int[][] {{1, 3}, {3, 4}};
     Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *?
     resultCount = 2;
     Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
 
@@ -784,7 +793,7 @@ public class TestTxnCommands2 {
     rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);
     resultData = new int[][] {{1, 3}, {3, 4}};
     Assert.assertEquals(stringifyValues(resultData), rs);
-    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);
+    rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *?
     resultCount = 2;
     Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
 
@@ -1500,12 +1509,13 @@ public class TestTxnCommands2 {
     String query = "merge into " + Table.ACIDTBL +
       " t using " + Table.NONACIDPART2 + " s ON t.a = s.a2 " +
       "WHEN MATCHED AND t.b between 1 and 3 THEN UPDATE set b = s.b2 " +
-      "WHEN NOT MATCHED and s.b2 >= 11 THEN INSERT VALUES(s.a2, s.b2)";
+      "WHEN NOT MATCHED and s.b2 >= 8 THEN INSERT VALUES(s.a2, s.b2)";
     runStatementOnDriver(query);
 
     r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
-    int[][] rExpected = {{2,2},{4,3},{5,6},{7,8},{11,11}};
+    int[][] rExpected = {{2,2},{4,3},{5,6},{7,8},{8,8},{11,11}};
     Assert.assertEquals(stringifyValues(rExpected), r);
+    assertUniqueID(Table.ACIDTBL);
   }
 
   /**
@@ -1533,6 +1543,7 @@ public class TestTxnCommands2 {
     r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
     int[][] rExpected = {{2,2},{4,44},{5,5},{7,8},{11,11}};
     Assert.assertEquals(stringifyValues(rExpected), r);
+    assertUniqueID(Table.ACIDTBL);
   }
 
   /**
@@ -1559,27 +1570,34 @@ public class TestTxnCommands2 {
     int[][] rExpected = {{7,8},{11,11}};
     Assert.assertEquals(stringifyValues(rExpected), r);
   }
-  /**
-   * https://hortonworks.jira.com/browse/BUG-66580
-   * @throws Exception
-   */
-  @Ignore
   @Test
   public void testMultiInsert() throws Exception {
-    runStatementOnDriver("create table if not exists  srcpart (a int, b int, c int) " +
-      "partitioned by (z int) clustered by (a) into 2 buckets " +
-      "stored as orc tblproperties('transactional'='true')");
     runStatementOnDriver("create temporary table if not exists data1 (x int)");
-//    runStatementOnDriver("create temporary table if not exists data2 (x int)");
-
-    runStatementOnDriver("insert into data1 values (1),(2),(3)");
-//    runStatementOnDriver("insert into data2 values (4),(5),(6)");
+    runStatementOnDriver("insert into data1 values (1),(2),(1)");
     d.destroy();
     hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
     d = new Driver(hiveConf);
-    List<String> r = runStatementOnDriver(" from data1 " +
-      "insert into srcpart partition(z) select 0,0,1,x  " +
-      "insert into srcpart partition(z=1) select 0,0,1");
+
+    runStatementOnDriver(" from data1 " +
+      "insert into " + Table.ACIDTBLPART + " partition(p) select 0, 0, 'p' || x  "
+      +
+      "insert into " + Table.ACIDTBLPART + " partition(p='p1') select 0, 1");
+    /**
+     * Using {@link BucketCodec.V0} the output
+     * is missing 1 of the (p1,0,1) rows because they have the same ROW__ID and only differ by
+     * StatementId so {@link org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger} skips one.
+     * With split update (and V0), the data is read correctly (insert deltas are now the base) but we still
+     * should get duplicate ROW__IDs.
+     */
+    List<String> r = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b");
+    Assert.assertEquals("[p1\t0\t0, p1\t0\t0, p1\t0\t1, p1\t0\t1, p1\t0\t1, p2\t0\t0]", r.toString());
+    assertUniqueID(Table.ACIDTBLPART);
+    /**
+     * this delete + select covers VectorizedOrcAcidRowBatchReader
+     */
+    runStatementOnDriver("delete from " + Table.ACIDTBLPART);
+    r = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b");
+    Assert.assertEquals("[]", r.toString());
   }
   /**
    * Investigating DP and WriteEntity, etc
@@ -1645,6 +1663,8 @@ public class TestTxnCommands2 {
     r1 = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b");
     String result= r1.toString();
     Assert.assertEquals("[new part\t5\t5, new part\t11\t11, p1\t1\t1, p1\t2\t15, p1\t3\t3, p2\t4\t44]", result);
+    //note: inserts go into 'new part'... so this won't fail
+    assertUniqueID(Table.ACIDTBLPART);
   }
   /**
    * Using nested partitions and thus DummyPartition
@@ -1667,6 +1687,8 @@ public class TestTxnCommands2 {
       "when not matched then insert values(s.a, s.b, 3,4)");
     r1 = runStatementOnDriver("select p,q,a,b from " + Table.ACIDNESTEDPART + " order by p,q, a, b");
     Assert.assertEquals(stringifyValues(new int[][] {{1,1,1,1},{1,1,3,3},{1,2,2,15},{1,2,4,44},{3,4,5,5},{3,4,11,11}}), r1);
+    //insert of merge lands in part (3,4) - no updates land there
+    assertUniqueID(Table.ACIDNESTEDPART);
   }
   @Ignore("Covered elsewhere")
   @Test
@@ -1703,6 +1725,41 @@ public class TestTxnCommands2 {
     Assert.assertEquals(stringifyValues(rExpected), r);
   }
 
+  @Test
+  public void testBucketCodec() throws Exception {
+    d.destroy();
+    //insert data in "legacy" format
+    hiveConf.setIntVar(HiveConf.ConfVars.TESTMODE_BUCKET_CODEC_VERSION, 0);
+    d = new Driver(hiveConf);
+
+    int[][] targetVals = {{2,1},{4,3},{5,6},{7,8}};
+    runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(targetVals));
+
+    d.destroy();
+    hiveConf.setIntVar(HiveConf.ConfVars.TESTMODE_BUCKET_CODEC_VERSION, 1);
+    d = new Driver(hiveConf);
+    //do some operations with new format
+    runStatementOnDriver("update " + Table.ACIDTBL + " set b=11 where a in (5,7)");
+    runStatementOnDriver("insert into " + Table.ACIDTBL + " values(11,11)");
+    runStatementOnDriver("delete from " + Table.ACIDTBL + " where a = 7");
+
+    //make sure we get the right data back before/after compactions
+    List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+    int[][] rExpected = {{2,1},{4,3},{5,11},{11,11}};
+    Assert.assertEquals(stringifyValues(rExpected), r);
+
+    runStatementOnDriver("ALTER TABLE " + Table.ACIDTBL + " COMPACT 'MINOR'");
+    runWorker(hiveConf);
+
+    r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+    Assert.assertEquals(stringifyValues(rExpected), r);
+
+    runStatementOnDriver("ALTER TABLE " + Table.ACIDTBL + " COMPACT 'MAJOR'");
+    runWorker(hiveConf);
+
+    r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+    Assert.assertEquals(stringifyValues(rExpected), r);
+  }
   /**
    * takes raw data and turns it into a string as if from Driver.getResults()
    * sorts rows in dictionary order
@@ -1723,7 +1780,7 @@ public class TestTxnCommands2 {
     }
     return rs;
   }
-  private static final class RowComp implements Comparator<int[]> {
+  static class RowComp implements Comparator<int[]> {
     @Override
     public int compare(int[] row1, int[] row2) {
       assert row1 != null && row2 != null && row1.length == row2.length;
@@ -1736,7 +1793,7 @@ public class TestTxnCommands2 {
       return 0;
     }
   }
-  String makeValuesClause(int[][] rows) {
+  static String makeValuesClause(int[][] rows) {
     assert rows.length > 0;
     StringBuilder sb = new StringBuilder("values");
     for(int[] row : rows) {
@@ -1767,4 +1824,19 @@ public class TestTxnCommands2 {
     d.getResults(rs);
     return rs;
   }
+  final void assertUniqueID(Table table) throws Exception {
+    String partCols = table.getPartitionColumns();
+    //check to make sure there are no duplicate ROW__IDs - HIVE-16832
+    StringBuilder sb = new StringBuilder("select ");
+    if(partCols != null && partCols.length() > 0) {
+      sb.append(partCols).append(",");
+    }
+    sb.append(" ROW__ID, count(*) from ").append(table).append(" group by ");
+    if(partCols != null && partCols.length() > 0) {
+      sb.append(partCols).append(",");
+    }
+    sb.append("ROW__ID having count(*) > 1");
+    List<String> r = runStatementOnDriver(sb.toString());
+    Assert.assertTrue("Duplicate ROW__ID: " + r.toString(),r.size() == 0);
+  }
 }
index ea5ecbc..520e958 100644 (file)
@@ -542,45 +542,4 @@ public class TestTxnCommands2WithSplitUpdate extends TestTxnCommands2 {
     resultCount = 2;
     Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
   }
-  @Ignore("HIVE-14947")
-  @Test
-  @Override
-  public void testDynamicPartitionsMerge() throws Exception {}
-  @Ignore("HIVE-14947")
-  @Test
-  @Override
-  public void testDynamicPartitionsMerge2() throws Exception {}
-  @Ignore("HIVE-14947")
-  @Test
-  @Override
-  public void testMerge() throws Exception {}
-
-  /**
-   * todo: remove this test once HIVE-14947 is done (parent class has a better version)
-   */
-  @Test
-  @Override
-  public void testMerge2() throws Exception {
-    int[][] baseValsOdd = {{5,5},{11,11}};
-    int[][] baseValsEven = {{2,2},{4,44}};
-    runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='odd') " + makeValuesClause(baseValsOdd));
-    runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='even') " + makeValuesClause(baseValsEven));
-    int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
-    runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
-    List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
-    Assert.assertEquals(stringifyValues(vals), r);
-    String query = "merge into " + Table.ACIDTBL +
-      " using " + Table.NONACIDPART2 + " source ON " + Table.ACIDTBL + ".a = source.a2 " +
-      "WHEN MATCHED THEN UPDATE set b = source.b2 ";
-    r = runStatementOnDriver(query);
-
-    r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
-    int[][] rExpected = {{2,2},{4,44},{5,5},{7,8}};
-    Assert.assertEquals(stringifyValues(rExpected), r);
-
-  }
-  @Ignore("HIVE-14947")
-  @Test
-  @Override
-  public void testMergeWithPredicate() throws Exception {}
 }
index c928732..44ff65c 100644 (file)
@@ -114,25 +114,25 @@ public class TestAcidUtils {
     assertEquals(true, opts.isWritingBase());
     assertEquals(567, opts.getMaximumTransactionId());
     assertEquals(0, opts.getMinimumTransactionId());
-    assertEquals(123, opts.getBucket());
+    assertEquals(123, opts.getBucketId());
     opts = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "delta_000005_000006/bucket_00001"),
         conf);
     assertEquals(false, opts.getOldStyle());
     assertEquals(false, opts.isWritingBase());
     assertEquals(6, opts.getMaximumTransactionId());
     assertEquals(5, opts.getMinimumTransactionId());
-    assertEquals(1, opts.getBucket());
+    assertEquals(1, opts.getBucketId());
     opts = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "delete_delta_000005_000006/bucket_00001"),
         conf);
     assertEquals(false, opts.getOldStyle());
     assertEquals(false, opts.isWritingBase());
     assertEquals(6, opts.getMaximumTransactionId());
     assertEquals(5, opts.getMinimumTransactionId());
-    assertEquals(1, opts.getBucket());
+    assertEquals(1, opts.getBucketId());
     opts = AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "000123_0"), conf);
     assertEquals(true, opts.getOldStyle());
     assertEquals(true, opts.isWritingBase());
-    assertEquals(123, opts.getBucket());
+    assertEquals(123, opts.getBucketId());
     assertEquals(0, opts.getMinimumTransactionId());
     assertEquals(0, opts.getMaximumTransactionId());
 
index 43ed238..b004cf5 100644 (file)
@@ -81,7 +81,6 @@ import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
-import org.apache.hadoop.hive.ql.io.RecordUpdater;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.Context;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitStrategy;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
@@ -2445,14 +2444,14 @@ public class TestInputOutputFormat {
     assertEquals("mock:/combinationAcid/p=0/base_0000010/bucket_00000",
         split.getPath().toString());
     assertEquals(0, split.getStart());
-    assertEquals(607, split.getLength());
+    assertEquals(648, split.getLength());
     split = (HiveInputFormat.HiveInputSplit) splits[1];
     assertEquals("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
         split.inputFormatClassName());
     assertEquals("mock:/combinationAcid/p=0/base_0000010/bucket_00001",
         split.getPath().toString());
     assertEquals(0, split.getStart());
-    assertEquals(629, split.getLength());
+    assertEquals(674, split.getLength());
     CombineHiveInputFormat.CombineHiveInputSplit combineSplit =
         (CombineHiveInputFormat.CombineHiveInputSplit) splits[2];
     assertEquals(BUCKETS, combineSplit.getNumPaths());
@@ -3858,7 +3857,7 @@ public class TestInputOutputFormat {
     OrcStruct struct = reader.createValue();
     while (reader.next(id, struct)) {
       assertEquals("id " + record, record, id.getRowId());
-      assertEquals("bucket " + record, 0, id.getBucketId());
+      assertEquals("bucket " + record, 0, id.getBucketProperty());
       assertEquals("trans " + record, 1, id.getTransactionId());
       assertEquals("a " + record,
           42 * record, ((IntWritable) struct.getFieldValue(0)).get());
@@ -3885,7 +3884,7 @@ public class TestInputOutputFormat {
     struct = reader.createValue();
     while (reader.next(id, struct)) {
       assertEquals("id " + record, record, id.getRowId());
-      assertEquals("bucket " + record, 0, id.getBucketId());
+      assertEquals("bucket " + record, 0, id.getBucketProperty());
       assertEquals("trans " + record, 1, id.getTransactionId());
       assertEquals("a " + record,
           42 * record, ((IntWritable) struct.getFieldValue(0)).get());
index 584bd3b..2406af5 100644 (file)
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.io.orc;
 
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.orc.CompressionKind;
 import org.apache.orc.MemoryManager;
 import org.apache.orc.StripeInformation;
@@ -38,10 +39,8 @@ import org.apache.hadoop.hive.ql.io.RecordUpdater;
 import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair;
 import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderKey;
 import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderPair;
-import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.IntWritable;
@@ -192,14 +191,14 @@ public class TestOrcRawRecordMerger {
     pair.advnaceToMinKey();
     RecordReader recordReader = pair.recordReader;
     assertEquals(10, key.getTransactionId());
-    assertEquals(20, key.getBucketId());
+    assertEquals(20, key.getBucketProperty());
     assertEquals(40, key.getRowId());
     assertEquals(120, key.getCurrentTransactionId());
     assertEquals("third", value(pair.nextRecord));
 
     pair.next(pair.nextRecord);
     assertEquals(40, key.getTransactionId());
-    assertEquals(50, key.getBucketId());
+    assertEquals(50, key.getBucketProperty());
     assertEquals(60, key.getRowId());
     assertEquals(130, key.getCurrentTransactionId());
     assertEquals("fourth", value(pair.nextRecord));
@@ -219,35 +218,35 @@ public class TestOrcRawRecordMerger {
     pair.advnaceToMinKey();
     RecordReader recordReader = pair.recordReader;
     assertEquals(10, key.getTransactionId());
-    assertEquals(20, key.getBucketId());
+    assertEquals(20, key.getBucketProperty());
     assertEquals(20, key.getRowId());
     assertEquals(100, key.getCurrentTransactionId());
     assertEquals("first", value(pair.nextRecord));
 
     pair.next(pair.nextRecord);
     assertEquals(10, key.getTransactionId());
-    assertEquals(20, key.getBucketId());
+    assertEquals(20, key.getBucketProperty());
     assertEquals(30, key.getRowId());
     assertEquals(110, key.getCurrentTransactionId());
     assertEquals("second", value(pair.nextRecord));
 
     pair.next(pair.nextRecord);
     assertEquals(10, key.getTransactionId());
-    assertEquals(20, key.getBucketId());
+    assertEquals(20, key.getBucketProperty());
     assertEquals(40, key.getRowId());
     assertEquals(120, key.getCurrentTransactionId());
     assertEquals("third", value(pair.nextRecord));
 
     pair.next(pair.nextRecord);
     assertEquals(40, key.getTransactionId());
-    assertEquals(50, key.getBucketId());
+    assertEquals(50, key.getBucketProperty());
     assertEquals(60, key.getRowId());
     assertEquals(130, key.getCurrentTransactionId());
     assertEquals("fourth", value(pair.nextRecord));
 
     pair.next(pair.nextRecord);
     assertEquals(40, key.getTransactionId());
-    assertEquals(50, key.getBucketId());
+    assertEquals(50, key.getBucketProperty());
     assertEquals(61, key.getRowId());
     assertEquals(140, key.getCurrentTransactionId());
     assertEquals("fifth", value(pair.nextRecord));
@@ -302,14 +301,14 @@ public class TestOrcRawRecordMerger {
     pair.advnaceToMinKey();
     RecordReader recordReader = pair.recordReader;
     assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketId());
+    assertEquals(10, key.getBucketProperty());
     assertEquals(2, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
     assertEquals("third", value(pair.nextRecord));
 
     pair.next(pair.nextRecord);
     assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketId());
+    assertEquals(10, key.getBucketProperty());
     assertEquals(3, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
     assertEquals("fourth", value(pair.nextRecord));
@@ -337,35 +336,35 @@ public class TestOrcRawRecordMerger {
     pair.advnaceToMinKey();
     assertEquals("first", value(pair.nextRecord));
     assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketId());
+    assertEquals(10, key.getBucketProperty());
     assertEquals(0, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
 
     pair.next(pair.nextRecord);
     assertEquals("second", value(pair.nextRecord));
     assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketId());
+    assertEquals(10, key.getBucketProperty());
     assertEquals(1, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
 
     pair.next(pair.nextRecord);
     assertEquals("third", value(pair.nextRecord));
     assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketId());
+    assertEquals(10, key.getBucketProperty());
     assertEquals(2, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
 
     pair.next(pair.nextRecord);
     assertEquals("fourth", value(pair.nextRecord));
     assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketId());
+    assertEquals(10, key.getBucketProperty());
     assertEquals(3, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
 
     pair.next(pair.nextRecord);
     assertEquals("fifth", value(pair.nextRecord));
     assertEquals(0, key.getTransactionId());
-    assertEquals(10, key.getBucketId());
+    assertEquals(10, key.getBucketProperty());
     assertEquals(4, key.getRowId());
     assertEquals(0, key.getCurrentTransactionId());
 
@@ -448,13 +447,13 @@ public class TestOrcRawRecordMerger {
 
     assertEquals(true, merger.next(id, event));
     assertEquals(10, id.getTransactionId());
-    assertEquals(20, id.getBucketId());
+    assertEquals(20, id.getBucketProperty());
     assertEquals(40, id.getRowId());
     assertEquals("third", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(40, id.getTransactionId());
-    assertEquals(50, id.getBucketId());
+    assertEquals(50, id.getBucketProperty());
     assertEquals(60, id.getRowId());
     assertEquals("fourth", getValue(event));
 
@@ -580,6 +579,7 @@ public class TestOrcRawRecordMerger {
     // write the base
     AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
         .inspector(inspector).bucket(BUCKET).finalDestination(root);
+    final int BUCKET_PROPERTY = BucketCodec.V1.encode(options);
     if(!use130Format) {
       options.statementId(-1);
     }
@@ -593,11 +593,11 @@ public class TestOrcRawRecordMerger {
     // write a delta
     ru = of.getRecordUpdater(root, options.writingBase(false)
         .minimumTransactionId(200).maximumTransactionId(200).recordIdColumn(1));
-    ru.update(200, new MyRow("update 1", 0, 0, BUCKET));
-    ru.update(200, new MyRow("update 2", 2, 0, BUCKET));
-    ru.update(200, new MyRow("update 3", 3, 0, BUCKET));
-    ru.delete(200, new MyRow("", 7, 0, BUCKET));
-    ru.delete(200, new MyRow("", 8, 0, BUCKET));
+    ru.update(200, new MyRow("update 1", 0, 0, BUCKET_PROPERTY));
+    ru.update(200, new MyRow("update 2", 2, 0, BUCKET_PROPERTY));
+    ru.update(200, new MyRow("update 3", 3, 0, BUCKET_PROPERTY));
+    ru.delete(200, new MyRow("", 7, 0, BUCKET_PROPERTY));
+    ru.delete(200, new MyRow("", 8, 0, BUCKET_PROPERTY));
     ru.close(false);
 
     ValidTxnList txnList = new ValidReadTxnList("200:" + Long.MAX_VALUE);
@@ -629,64 +629,64 @@ public class TestOrcRawRecordMerger {
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
         OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 0, 200), id);
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id);
     assertEquals("update 1", getValue(event));
     assertFalse(merger.isDelete(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
         OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 1, 0), id);
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 1, 0), id);
     assertEquals("second", getValue(event));
     assertFalse(merger.isDelete(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
         OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 2, 200), id);
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id);
     assertEquals("update 2", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
         OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 3, 200), id);
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id);
     assertEquals("update 3", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
         OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 4, 0), id);
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 4, 0), id);
     assertEquals("fifth", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
         OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 5, 0), id);
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 5, 0), id);
     assertEquals("sixth", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
         OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 6, 0), id);
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 6, 0), id);
     assertEquals("seventh", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.DELETE_OPERATION,
         OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 7, 200), id);
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 200), id);
     assertNull(OrcRecordUpdater.getRow(event));
     assertTrue(merger.isDelete(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.DELETE_OPERATION,
         OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 8, 200), id);
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 200), id);
     assertNull(OrcRecordUpdater.getRow(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
         OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 9, 0), id);
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 9, 0), id);
     assertEquals("tenth", getValue(event));
 
     assertEquals(false, merger.next(id, event));
@@ -700,90 +700,90 @@ public class TestOrcRawRecordMerger {
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
         OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 0, 200), id);
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id);
     assertEquals("update 1", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
         OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 0, 0), id);
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 0), id);
     assertEquals("first", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
         OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 1, 0), id);
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 1, 0), id);
     assertEquals("second", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
         OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 2, 200), id);
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id);
     assertEquals("update 2", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
         OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 2, 0), id);
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 0), id);
     assertEquals("third", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
         OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 3, 200), id);
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id);
     assertEquals("update 3", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
         OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 3, 0), id);
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 0), id);
     assertEquals("fourth", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
         OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 4, 0), id);
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 4, 0), id);
     assertEquals("fifth", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
         OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 5, 0), id);
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 5, 0), id);
     assertEquals("sixth", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
         OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 6, 0), id);
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 6, 0), id);
     assertEquals("seventh", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.DELETE_OPERATION,
         OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 7, 200), id);
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 200), id);
     assertNull(OrcRecordUpdater.getRow(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
         OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 7, 0), id);
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 0), id);
     assertEquals("eighth", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.DELETE_OPERATION,
         OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 8, 200), id);
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 200), id);
     assertNull(OrcRecordUpdater.getRow(event));
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
         OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 8, 0), id);
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 0), id);
     assertEquals("ninth", getValue(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
         OrcRecordUpdater.getOperation(event));
-    assertEquals(new ReaderKey(0, BUCKET, 9, 0), id);
+    assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 9, 0), id);
     assertEquals("tenth", getValue(event));
 
     assertEquals(false, merger.next(id, event));
@@ -800,7 +800,7 @@ public class TestOrcRawRecordMerger {
       LOG.info("id = " + id + "event = " + event);
       assertEquals(OrcRecordUpdater.INSERT_OPERATION,
           OrcRecordUpdater.getOperation(event));
-      assertEquals(new ReaderKey(0, BUCKET, i, 0), id);
+      assertEquals(new ReaderKey(0, BUCKET_PROPERTY, i, 0), id);
       assertEquals(values[i], getValue(event));
     }
 
@@ -988,6 +988,9 @@ public class TestOrcRawRecordMerger {
         new OrcRecordUpdater.OrcOptions(conf)
         .writingBase(true).minimumTransactionId(0).maximumTransactionId(0)
         .bucket(BUCKET).inspector(inspector).filesystem(fs);
+
+    final int BUCKET_PROPERTY = BucketCodec.V1.encode(options);
+
     options.orcOptions(OrcFile.writerOptions(conf)
       .stripeSize(1).blockPadding(false).compress(CompressionKind.NONE)
       .memory(mgr).batchSize(2));
@@ -1008,10 +1011,10 @@ public class TestOrcRawRecordMerger {
         "ignore.7"};
     for(int i=0; i < values.length; ++i) {
       if (values[i] != null) {
-        ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET));
+        ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET_PROPERTY));
       }
     }
-    ru.delete(100, new BigRow(9, 0, BUCKET));
+    ru.delete(100, new BigRow(9, 0, BUCKET_PROPERTY));
     ru.close(false);
 
     // write a delta
@@ -1020,10 +1023,10 @@ public class TestOrcRawRecordMerger {
     values = new String[]{null, null, "1.0", null, null, null, null, "3.1"};
     for(int i=0; i < values.length; ++i) {
       if (values[i] != null) {
-        ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET));
+        ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET_PROPERTY));
       }
     }
-    ru.delete(100, new BigRow(8, 0, BUCKET));
+    ru.delete(100, new BigRow(8, 0, BUCKET_PROPERTY));
     ru.close(false);
 
     InputFormat inf = new OrcInputFormat();
index 67c473e..be15517 100644 (file)
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.io.RecordUpdater;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -143,27 +144,27 @@ public class TestOrcRecordUpdater {
         OrcRecordUpdater.getOperation(row));
     assertEquals(11, OrcRecordUpdater.getCurrentTransaction(row));
     assertEquals(11, OrcRecordUpdater.getOriginalTransaction(row));
-    assertEquals(10, OrcRecordUpdater.getBucket(row));
+    assertEquals(10, getBucketId(row));
     assertEquals(0, OrcRecordUpdater.getRowId(row));
     assertEquals("first",
         OrcRecordUpdater.getRow(row).getFieldValue(0).toString());
     assertEquals(true, rows.hasNext());
     row = (OrcStruct) rows.next(null);
     assertEquals(1, OrcRecordUpdater.getRowId(row));
-    assertEquals(10, OrcRecordUpdater.getBucket(row));
+    assertEquals(10, getBucketId(row));
     assertEquals("second",
         OrcRecordUpdater.getRow(row).getFieldValue(0).toString());
     assertEquals(true, rows.hasNext());
     row = (OrcStruct) rows.next(null);
     assertEquals(2, OrcRecordUpdater.getRowId(row));
-    assertEquals(10, OrcRecordUpdater.getBucket(row));
+    assertEquals(10, getBucketId(row));
     assertEquals("third",
         OrcRecordUpdater.getRow(row).getFieldValue(0).toString());
     assertEquals(true, rows.hasNext());
     row = (OrcStruct) rows.next(null);
     assertEquals(12, OrcRecordUpdater.getCurrentTransaction(row));
     assertEquals(12, OrcRecordUpdater.getOriginalTransaction(row));
-    assertEquals(10, OrcRecordUpdater.getBucket(row));
+    assertEquals(10, getBucketId(row));
     assertEquals(0, OrcRecordUpdater.getRowId(row));
     assertEquals("fourth",
         OrcRecordUpdater.getRow(row).getFieldValue(0).toString());
@@ -184,7 +185,11 @@ public class TestOrcRecordUpdater {
 
     assertEquals(false, fs.exists(sidePath));
   }
-
+  private static int getBucketId(OrcStruct row) {
+    int bucketValue = OrcRecordUpdater.getBucket(row);
+    return
+      BucketCodec.determineVersion(bucketValue).decodeWriterId(bucketValue);
+  }
   @Test
   public void testWriterTblProperties() throws Exception {
     Path root = new Path(workDir, "testWriterTblProperties");
index 73bc1ab..439ec9b 100644 (file)
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.io.RecordUpdater;
@@ -72,6 +73,7 @@ public class TestVectorizedOrcAcidRowBatchReader {
 
     DummyRow(long val, long rowId, long origTxn, int bucket) {
       field = new LongWritable(val);
+      bucket = BucketCodec.V1.encode(new AcidOutputFormat.Options(null).bucket(bucket));
       ROW__ID = new RecordIdentifier(origTxn, bucket, rowId);
     }
 
index 24f8de1..d8d59b2 100644 (file)
@@ -18,4 +18,10 @@ INSERT INTO TABLE acidTblDefault VALUES (1);
 -- Exactly one of the buckets should be selected out of the 16 buckets
 -- by the following selection query.
 EXPLAIN EXTENDED
-SELECT * FROM acidTblDefault WHERE a = 1;
\ No newline at end of file
+SELECT * FROM acidTblDefault WHERE a = 1;
+
+select count(*) from acidTblDefault WHERE a = 1;
+
+set hive.tez.bucket.pruning=false;
+
+select count(*) from acidTblDefault WHERE a = 1;
index 195278a..6ab6b43 100644 (file)
@@ -98,7 +98,7 @@ Partition Parameters:
        numFiles                2                   
        numRows                 0                   
        rawDataSize             0                   
-       totalSize               3852                
+       totalSize               3950                
 #### A masked pattern was here ####
                 
 # Storage Information           
@@ -136,9 +136,9 @@ STAGE PLANS:
       Map Operator Tree:
           TableScan
             alias: acid
-            Statistics: Num rows: 1 Data size: 3852 Basic stats: PARTIAL Column stats: NONE
+            Statistics: Num rows: 1 Data size: 3950 Basic stats: PARTIAL Column stats: NONE
             Select Operator
-              Statistics: Num rows: 1 Data size: 3852 Basic stats: PARTIAL Column stats: NONE
+              Statistics: Num rows: 1 Data size: 3950 Basic stats: PARTIAL Column stats: NONE
               Group By Operator
                 aggregations: count()
                 mode: hash
@@ -215,7 +215,7 @@ Partition Parameters:
        numFiles                2                   
        numRows                 1000                
        rawDataSize             208000              
-       totalSize               3852                
+       totalSize               3950                
 #### A masked pattern was here ####
                 
 # Storage Information           
@@ -264,7 +264,7 @@ Partition Parameters:
        numFiles                2                   
        numRows                 1000                
        rawDataSize             208000              
-       totalSize               3852                
+       totalSize               3950                
 #### A masked pattern was here ####
                 
 # Storage Information           
@@ -391,7 +391,7 @@ Partition Parameters:
        numFiles                4                   
        numRows                 1000                
        rawDataSize             208000              
-       totalSize               7718                
+       totalSize               7904                
 #### A masked pattern was here ####
                 
 # Storage Information           
@@ -440,7 +440,7 @@ Partition Parameters:
        numFiles                4                   
        numRows                 2000                
        rawDataSize             416000              
-       totalSize               7718                
+       totalSize               7904                
 #### A masked pattern was here ####
                 
 # Storage Information           
index c3ad192..fe3b9e5 100644 (file)
@@ -201,7 +201,7 @@ Table Parameters:
        numFiles                2                   
        numRows                 0                   
        rawDataSize             0                   
-       totalSize               1724                
+       totalSize               1798                
        transactional           true                
 #### A masked pattern was here ####
                 
@@ -244,7 +244,7 @@ Table Parameters:
        numFiles                4                   
        numRows                 0                   
        rawDataSize             0                   
-       totalSize               2763                
+       totalSize               2909                
        transactional           true                
 #### A masked pattern was here ####
                 
index bcf33d4..6df425f 100644 (file)
@@ -171,7 +171,7 @@ Table Parameters:
        numFiles                1                   
        numRows                 0                   
        rawDataSize             0                   
-       totalSize               295426              
+       totalSize               295483              
        transactional           true                
 #### A masked pattern was here ####
                 
@@ -199,9 +199,9 @@ STAGE PLANS:
       Map Operator Tree:
           TableScan
             alias: acid_ivot
-            Statistics: Num rows: 1 Data size: 295426 Basic stats: COMPLETE Column stats: COMPLETE
+            Statistics: Num rows: 1 Data size: 295483 Basic stats: COMPLETE Column stats: COMPLETE
             Select Operator
-              Statistics: Num rows: 1 Data size: 295426 Basic stats: COMPLETE Column stats: COMPLETE
+              Statistics: Num rows: 1 Data size: 295483 Basic stats: COMPLETE Column stats: COMPLETE
               Group By Operator
                 aggregations: count()
                 mode: hash
@@ -364,7 +364,7 @@ Table Parameters:
        numFiles                1                   
        numRows                 0                   
        rawDataSize             0                   
-       totalSize               1512                
+       totalSize               1554                
        transactional           true                
 #### A masked pattern was here ####
                 
@@ -392,9 +392,9 @@ STAGE PLANS:
       Map Operator Tree:
           TableScan
             alias: acid_ivot
-            Statistics: Num rows: 1 Data size: 1512 Basic stats: COMPLETE Column stats: COMPLETE
+            Statistics: Num rows: 1 Data size: 1554 Basic stats: COMPLETE Column stats: COMPLETE
             Select Operator
-              Statistics: Num rows: 1 Data size: 1512 Basic stats: COMPLETE Column stats: COMPLETE
+              Statistics: Num rows: 1 Data size: 1554 Basic stats: COMPLETE Column stats: COMPLETE
               Group By Operator
                 aggregations: count()
                 mode: hash
@@ -486,7 +486,7 @@ Table Parameters:
        numFiles                2                   
        numRows                 0                   
        rawDataSize             0                   
-       totalSize               3024                
+       totalSize               3109                
        transactional           true                
 #### A masked pattern was here ####
                 
@@ -514,9 +514,9 @@ STAGE PLANS:
       Map Operator Tree:
           TableScan
             alias: acid_ivot
-            Statistics: Num rows: 1 Data size: 3024 Basic stats: COMPLETE Column stats: COMPLETE
+            Statistics: Num rows: 1 Data size: 3109 Basic stats: COMPLETE Column stats: COMPLETE
             Select Operator
-              Statistics: Num rows: 1 Data size: 3024 Basic stats: COMPLETE Column stats: COMPLETE
+              Statistics: Num rows: 1 Data size: 3109 Basic stats: COMPLETE Column stats: COMPLETE
               Group By Operator
                 aggregations: count()
                 mode: hash
@@ -606,7 +606,7 @@ Table Parameters:
        numFiles                3                   
        numRows                 0                   
        rawDataSize             0                   
-       totalSize               298450              
+       totalSize               298592              
        transactional           true                
 #### A masked pattern was here ####
                 
@@ -634,9 +634,9 @@ STAGE PLANS:
       Map Operator Tree:
           TableScan
             alias: acid_ivot
-            Statistics: Num rows: 1 Data size: 298450 Basic stats: COMPLETE Column stats: COMPLETE
+            Statistics: Num rows: 1 Data size: 298592 Basic stats: COMPLETE Column stats: COMPLETE
             Select Operator
-              Statistics: Num rows: 1 Data size: 298450 Basic stats: COMPLETE Column stats: COMPLETE
+              Statistics: Num rows: 1 Data size: 298592 Basic stats: COMPLETE Column stats: COMPLETE
               Group By Operator
                 aggregations: count()
                 mode: hash
index 357ae7b..97f8d6b 100644 (file)
@@ -43,22 +43,22 @@ STAGE PLANS:
                   alias: acidtbldefault
                   filterExpr: (a = 1) (type: boolean)
                   buckets included: [1,] of 16
-                  Statistics: Num rows: 7972 Data size: 31888 Basic stats: COMPLETE Column stats: NONE
+                  Statistics: Num rows: 8143 Data size: 32572 Basic stats: COMPLETE Column stats: NONE
                   GatherStats: false
                   Filter Operator
                     isSamplingPred: false
                     predicate: (a = 1) (type: boolean)
-                    Statistics: Num rows: 3986 Data size: 15944 Basic stats: COMPLETE Column stats: NONE
+                    Statistics: Num rows: 4071 Data size: 16284 Basic stats: COMPLETE Column stats: NONE
                     Select Operator
                       expressions: 1 (type: int)
                       outputColumnNames: _col0
-                      Statistics: Num rows: 3986 Data size: 15944 Basic stats: COMPLETE Column stats: NONE
+                      Statistics: Num rows: 4071 Data size: 16284 Basic stats: COMPLETE Column stats: NONE
                       File Output Operator
                         compressed: false
                         GlobalTableId: 0
 #### A masked pattern was here ####
                         NumFilesPerFileSink: 1
-                        Statistics: Num rows: 3986 Data size: 15944 Basic stats: COMPLETE Column stats: NONE
+                        Statistics: Num rows: 4071 Data size: 16284 Basic stats: COMPLETE Column stats: NONE
 #### A masked pattern was here ####
                         table:
                             input format: org.apache.hadoop.mapred.SequenceFileInputFormat
@@ -100,7 +100,7 @@ STAGE PLANS:
                     serialization.ddl struct acidtbldefault { i32 a}
                     serialization.format 1
                     serialization.lib org.apache.hadoop.hive.ql.io.orc.OrcSerde
-                    totalSize 31888
+                    totalSize 32572
                     transactional true
                     transactional_properties default
 #### A masked pattern was here ####
@@ -123,7 +123,7 @@ STAGE PLANS:
                       serialization.ddl struct acidtbldefault { i32 a}
                       serialization.format 1
                       serialization.lib org.apache.hadoop.hive.ql.io.orc.OrcSerde
-                      totalSize 31888
+                      totalSize 32572
                       transactional true
                       transactional_properties default
 #### A masked pattern was here ####
@@ -139,3 +139,21 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
+PREHOOK: query: select count(*) from acidTblDefault WHERE a = 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@acidtbldefault
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from acidTblDefault WHERE a = 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@acidtbldefault
+#### A masked pattern was here ####
+1
+PREHOOK: query: select count(*) from acidTblDefault WHERE a = 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@acidtbldefault
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from acidTblDefault WHERE a = 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@acidtbldefault
+#### A masked pattern was here ####
+1
index 43c9b60..059ace9 100644 (file)
@@ -56,23 +56,23 @@ STAGE PLANS:
       Map Operator Tree:
           TableScan
             alias: hello_acid
-            Statistics: Num rows: 1 Data size: 2936 Basic stats: PARTIAL Column stats: NONE
+            Statistics: Num rows: 1 Data size: 3054 Basic stats: PARTIAL Column stats: NONE
             Select Operator
               expressions: ROW__ID.transactionid (type: bigint)
               outputColumnNames: _col0
-              Statistics: Num rows: 1 Data size: 2936 Basic stats: PARTIAL Column stats: NONE
+              Statistics: Num rows: 1 Data size: 3054 Basic stats: PARTIAL Column stats: NONE
               Reduce Output Operator
                 key expressions: _col0 (type: bigint)
                 sort order: +
-                Statistics: Num rows: 1 Data size: 2936 Basic stats: PARTIAL Column stats: NONE
+                Statistics: Num rows: 1 Data size: 3054 Basic stats: PARTIAL Column stats: NONE
       Reduce Operator Tree:
         Select Operator
           expressions: KEY.reducesinkkey0 (type: bigint)
           outputColumnNames: _col0
-          Statistics: Num rows: 1 Data size: 2936 Basic stats: PARTIAL Column stats: NONE
+          Statistics: Num rows: 1 Data size: 3054 Basic stats: PARTIAL Column stats: NONE
           File Output Operator
             compressed: false
-            Statistics: Num rows: 1 Data size: 2936 Basic stats: COMPLETE Column stats: NONE
+            Statistics: Num rows: 1 Data size: 3054 Basic stats: COMPLETE Column stats: NONE
             table:
                 input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                 output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
@@ -117,17 +117,17 @@ STAGE PLANS:
       Map Operator Tree:
           TableScan
             alias: hello_acid
-            Statistics: Num rows: 1 Data size: 2936 Basic stats: PARTIAL Column stats: NONE
+            Statistics: Num rows: 1 Data size: 3054 Basic stats: PARTIAL Column stats: NONE
             Filter Operator
               predicate: (ROW__ID.transactionid = 3) (type: boolean)
-              Statistics: Num rows: 1 Data size: 2936 Basic stats: COMPLETE Column stats: NONE
+              Statistics: Num rows: 1 Data size: 3054 Basic stats: COMPLETE Column stats: NONE
               Select Operator
                 expressions: ROW__ID.transactionid (type: bigint)
                 outputColumnNames: _col0
-                Statistics: Num rows: 1 Data size: 2936 Basic stats: COMPLETE Column stats: NONE
+                Statistics: Num rows: 1 Data size: 3054 Basic stats: COMPLETE Column stats: NONE
                 File Output Operator
                   compressed: false
-                  Statistics: Num rows: 1 Data size: 2936 Basic stats: COMPLETE Column stats: NONE
+                  Statistics: Num rows: 1 Data size: 3054 Basic stats: COMPLETE Column stats: NONE
                   table:
                       input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                       output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat