HIVE-14813 Make TransactionBatchImpl.toString() include state of each txn: commit...
authorEugene Koifman <ekoifman@hortonworks.com>
Wed, 27 Sep 2017 22:28:34 +0000 (15:28 -0700)
committerEugene Koifman <ekoifman@hortonworks.com>
Wed, 27 Sep 2017 22:28:34 +0000 (15:28 -0700)
hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/TransactionBatch.java
hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java

index 28c98bd..db3109e 100644 (file)
@@ -19,9 +19,9 @@
 package org.apache.hive.hcatalog.streaming;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.metastore.api.DataOperationType;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.cli.CliSessionState;
@@ -577,6 +577,14 @@ public class HiveEndPoint {
      */
     private volatile boolean isClosed = false;
     private final String agentInfo;
+    /**
+     * Tracks the state of each transaction
+     */
+    private final TxnState[] txnStatus;
+    /**
+     * ID of the last txn used by {@link #beginNextTransactionImpl()}
+     */
+    private long lastTxnUsed;
 
     /**
      * Represents a batch of transactions acquired from MetaStore
@@ -606,6 +614,10 @@ public class HiveEndPoint {
         this.agentInfo = agentInfo;
 
         txnIds = openTxnImpl(msClient, user, numTxns, ugi);
+        txnStatus = new TxnState[numTxns];
+        for(int i = 0; i < txnStatus.length; i++) {
+          txnStatus[i] = TxnState.OPEN;//Open matches Metastore state
+        }
 
         this.state = TxnState.INACTIVE;
         recordWriter.newBatch(txnIds.get(0), txnIds.get(txnIds.size()-1));
@@ -639,8 +651,14 @@ public class HiveEndPoint {
       if (txnIds==null || txnIds.isEmpty()) {
         return "{}";
       }
+      StringBuilder sb = new StringBuilder(" TxnStatus[");
+      for(TxnState state : txnStatus) {
+        //'state' should not be null - future proofing
+        sb.append(state == null ? "N" : state);
+      }
+      sb.append("] LastUsed ").append(JavaUtils.txnIdToString(lastTxnUsed));
       return "TxnIds=[" + txnIds.get(0) + "..." + txnIds.get(txnIds.size()-1)
-              + "] on endPoint = " + endPt;
+              + "] on endPoint = " + endPt + "; " + sb;
     }
 
     /**
@@ -678,6 +696,7 @@ public class HiveEndPoint {
                 " current batch for end point : " + endPt);
       ++currentTxnIndex;
       state = TxnState.OPEN;
+      lastTxnUsed = getCurrentTxnId();
       lockRequest = createLockRequest(endPt, partNameForLock, username, getCurrentTxnId(), agentInfo);
       try {
         LockResponse res = msClient.lock(lockRequest);
@@ -862,6 +881,7 @@ public class HiveEndPoint {
         recordWriter.flush();
         msClient.commitTxn(txnIds.get(currentTxnIndex));
         state = TxnState.COMMITTED;
+        txnStatus[currentTxnIndex] = TxnState.COMMITTED;
       } catch (NoSuchTxnException e) {
         throw new TransactionError("Invalid transaction id : "
                 + getCurrentTxnId(), e);
@@ -924,12 +944,14 @@ public class HiveEndPoint {
           for(currentTxnIndex = minOpenTxnIndex;
               currentTxnIndex < txnIds.size(); currentTxnIndex++) {
             msClient.rollbackTxn(txnIds.get(currentTxnIndex));
+            txnStatus[currentTxnIndex] = TxnState.ABORTED;
           }
           currentTxnIndex--;//since the loop left it == txnId.size()
         }
         else {
           if (getCurrentTxnId() > 0) {
             msClient.rollbackTxn(getCurrentTxnId());
+            txnStatus[currentTxnIndex] = TxnState.ABORTED;
           }
         }
         state = TxnState.ABORTED;
index 3bcc510..372ad2d 100644 (file)
@@ -33,7 +33,17 @@ import java.util.Collection;
  *
  */
 public interface TransactionBatch  {
-  public enum TxnState {INACTIVE, OPEN, COMMITTED, ABORTED }
+  enum TxnState {
+    INACTIVE("I"), OPEN("O"), COMMITTED("C"), ABORTED("A");
+
+    private final String code;
+    TxnState(String code) {
+      this.code = code;
+    };
+    public String toString() {
+      return code;
+    }
+  }
 
   /**
    * Activate the next available transaction in the current transaction batch
index 1e73a4b..921bbd3 100644 (file)
@@ -44,6 +44,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.Validator;
@@ -1974,7 +1975,12 @@ public class TestStreaming {
     txnBatch.write("name4,2,more Streaming unlimited".getBytes());
     txnBatch.write("name5,2,even more Streaming unlimited".getBytes());
     txnBatch.commit();
-    
+
+    //test toString()
+    String s = txnBatch.toString();
+    Assert.assertTrue("Actual: " + s, s.contains("LastUsed " + JavaUtils.txnIdToString(txnBatch.getCurrentTxnId())));
+    Assert.assertTrue("Actual: " + s, s.contains("TxnStatus[CO]"));
+
     expectedEx = null;
     txnBatch.beginNextTransaction();
     writer.enableErrors();
@@ -1998,6 +2004,11 @@ public class TestStreaming {
     Assert.assertTrue("commit() should have failed",
       expectedEx != null && expectedEx.getMessage().contains("has been closed()"));
 
+    //test toString()
+    s = txnBatch.toString();
+    Assert.assertTrue("Actual: " + s, s.contains("LastUsed " + JavaUtils.txnIdToString(txnBatch.getCurrentTxnId())));
+    Assert.assertTrue("Actual: " + s, s.contains("TxnStatus[CA]"));
+
     r = msClient.showTxns();
     Assert.assertEquals("HWM didn't match", 19, r.getTxn_high_water_mark());
     ti = r.getOpen_txns();