HIVE-19210: Create separate module for streaming ingest (Prasanth Jayachandran review...
authorPrasanth Jayachandran <prasanthj@apache.org>
Mon, 16 Apr 2018 06:34:12 +0000 (23:34 -0700)
committerPrasanth Jayachandran <prasanthj@apache.org>
Mon, 16 Apr 2018 06:34:12 +0000 (23:34 -0700)
32 files changed:
hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package-info.java [new file with mode: 0644]
itests/hive-unit/pom.xml
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
packaging/pom.xml
packaging/src/main/assembly/src.xml
pom.xml
streaming/pom.xml [new file with mode: 0644]
streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java [new file with mode: 0644]
streaming/src/java/org/apache/hive/streaming/ConnectionError.java [new file with mode: 0644]
streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java [new file with mode: 0644]
streaming/src/java/org/apache/hive/streaming/HeartBeatFailure.java [new file with mode: 0644]
streaming/src/java/org/apache/hive/streaming/HiveEndPoint.java [new file with mode: 0644]
streaming/src/java/org/apache/hive/streaming/ImpersonationFailed.java [new file with mode: 0644]
streaming/src/java/org/apache/hive/streaming/InvalidColumn.java [new file with mode: 0644]
streaming/src/java/org/apache/hive/streaming/InvalidPartition.java [new file with mode: 0644]
streaming/src/java/org/apache/hive/streaming/InvalidTable.java [new file with mode: 0644]
streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java [new file with mode: 0644]
streaming/src/java/org/apache/hive/streaming/PartitionCreationFailed.java [new file with mode: 0644]
streaming/src/java/org/apache/hive/streaming/QueryFailedException.java [new file with mode: 0644]
streaming/src/java/org/apache/hive/streaming/RecordWriter.java [new file with mode: 0644]
streaming/src/java/org/apache/hive/streaming/SerializationError.java [new file with mode: 0644]
streaming/src/java/org/apache/hive/streaming/StreamingConnection.java [new file with mode: 0644]
streaming/src/java/org/apache/hive/streaming/StreamingException.java [new file with mode: 0644]
streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java [new file with mode: 0644]
streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java [new file with mode: 0644]
streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java [new file with mode: 0644]
streaming/src/java/org/apache/hive/streaming/TransactionBatch.java [new file with mode: 0644]
streaming/src/java/org/apache/hive/streaming/TransactionBatchUnAvailable.java [new file with mode: 0644]
streaming/src/java/org/apache/hive/streaming/TransactionError.java [new file with mode: 0644]
streaming/src/java/org/apache/hive/streaming/package.html [new file with mode: 0644]
streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java [new file with mode: 0644]
streaming/src/test/org/apache/hive/streaming/TestStreaming.java [new file with mode: 0644]

diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package-info.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package-info.java
new file mode 100644 (file)
index 0000000..36d6b13
--- /dev/null
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@Deprecated // use org.apache.hive.streaming instead
+package org.apache.hive.hcatalog.streaming;
\ No newline at end of file
index 05c362e..3ae7f2f 100644 (file)
@@ -76,8 +76,8 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.apache.hive.hcatalog</groupId>
-      <artifactId>hive-hcatalog-streaming</artifactId>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-streaming</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
index 5966740..b19aa23 100644 (file)
@@ -70,11 +70,11 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hive.hcatalog.common.HCatUtil;
-import org.apache.hive.hcatalog.streaming.DelimitedInputWriter;
-import org.apache.hive.hcatalog.streaming.HiveEndPoint;
-import org.apache.hive.hcatalog.streaming.StreamingConnection;
-import org.apache.hive.hcatalog.streaming.StreamingException;
-import org.apache.hive.hcatalog.streaming.TransactionBatch;
+import org.apache.hive.streaming.DelimitedInputWriter;
+import org.apache.hive.streaming.HiveEndPoint;
+import org.apache.hive.streaming.StreamingConnection;
+import org.apache.hive.streaming.StreamingException;
+import org.apache.hive.streaming.TransactionBatch;
 import org.apache.orc.OrcConf;
 import org.junit.After;
 import org.junit.Assert;
index e2d61bd..fe1aac8 100644 (file)
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-streaming</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hive.hcatalog</groupId>
       <artifactId>hive-hcatalog-streaming</artifactId>
       <version>${project.version}</version>
index 486fe52..c477194 100644 (file)
@@ -97,6 +97,7 @@
         <include>spark-client/**/*</include>
         <include>storage-api/**/*</include>
         <include>standalone-metastore/**/*</include>
+        <include>streaming/**/*</include>
         <include>testutils/**/*</include>
         <include>vector-code-gen/**/*</include>
         <include>kryo-registrator/**/*</include>
diff --git a/pom.xml b/pom.xml
index 2d30789..6c43181 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -50,6 +50,7 @@
     <module>serde</module>
     <module>service-rpc</module>
     <module>service</module>
+    <module>streaming</module>
     <module>llap-common</module>
     <module>llap-client</module>
     <module>llap-ext-client</module>
diff --git a/streaming/pom.xml b/streaming/pom.xml
new file mode 100644 (file)
index 0000000..b58ec01
--- /dev/null
@@ -0,0 +1,141 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.hive</groupId>
+    <artifactId>hive</artifactId>
+    <version>3.1.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>hive-streaming</artifactId>
+  <packaging>jar</packaging>
+  <name>Hive Streaming</name>
+
+  <properties>
+    <hive.path.to.root>..</hive.path.to.root>
+  </properties>
+
+  <dependencies>
+    <!-- dependencies are always listed in sorted order by groupId, artifectId -->
+    <!-- intra-project -->
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-serde</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-metastore</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-exec</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-cli</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hive.hcatalog</groupId>
+      <artifactId>hive-hcatalog-core</artifactId>
+      <optional>true</optional>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <optional>true</optional>
+      <version>3.3.2</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <optional>true</optional>
+      <version>${hadoop.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <optional>true</optional>
+      <version>${hadoop.version}</version>
+             <exclusions>
+            <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commmons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+        </exclusions>
+    </dependency>
+
+    <!-- test -->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-common</artifactId>
+      <scope>test</scope>
+      <version>${hadoop.version}</version>
+             <exclusions>
+            <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commmons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+        </exclusions>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <sourceDirectory>${basedir}/src/java</sourceDirectory>
+    <testSourceDirectory>${basedir}/src/test</testSourceDirectory>
+    <resources>
+    </resources>
+    <plugins>
+      <!-- plugins are always listed in sorted order by groupId, artifectId -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
new file mode 100644 (file)
index 0000000..25998ae
--- /dev/null
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+
+
+public abstract class AbstractRecordWriter implements RecordWriter {
+  static final private Logger LOG = LoggerFactory.getLogger(AbstractRecordWriter.class.getName());
+
+  private final HiveConf conf;
+  private final HiveEndPoint endPoint;
+  final Table tbl;
+
+  private final IMetaStoreClient msClient;
+  final List<Integer> bucketIds;
+  private ArrayList<RecordUpdater> updaters = null;
+
+  private final int totalBuckets;
+  /**
+   * Indicates whether target table is bucketed
+   */
+  private final boolean isBucketed;
+
+  private final Path partitionPath;
+
+  private final AcidOutputFormat<?,?> outf;
+  private Object[] bucketFieldData; // Pre-allocated in constructor. Updated on each write.
+  private Long curBatchMinWriteId;
+  private Long curBatchMaxWriteId;
+
+  private static final class TableWriterPair {
+    private final Table tbl;
+    private final Path partitionPath;
+    TableWriterPair(Table t, Path p) {
+      tbl = t;
+      partitionPath = p;
+    }
+  }
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link #AbstractRecordWriter(HiveEndPoint, HiveConf, StreamingConnection)}
+   */
+  protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf)
+    throws ConnectionError, StreamingException {
+    this(endPoint, conf, null);
+  }
+  protected AbstractRecordWriter(HiveEndPoint endPoint2, HiveConf conf, StreamingConnection conn)
+          throws StreamingException {
+    this.endPoint = endPoint2;
+    this.conf = conf!=null ? conf
+                : HiveEndPoint.createHiveConf(DelimitedInputWriter.class, endPoint.metaStoreUri);
+    try {
+      msClient = HCatUtil.getHiveMetastoreClient(this.conf);
+      UserGroupInformation ugi = conn != null ? conn.getUserGroupInformation() : null;
+      if (ugi == null) {
+        this.tbl = msClient.getTable(endPoint.database, endPoint.table);
+        this.partitionPath = getPathForEndPoint(msClient, endPoint);
+      } else {
+        TableWriterPair twp = ugi.doAs(
+          new PrivilegedExceptionAction<TableWriterPair>() {
+            @Override
+            public TableWriterPair run() throws Exception {
+              return new TableWriterPair(msClient.getTable(endPoint.database, endPoint.table),
+                getPathForEndPoint(msClient, endPoint));
+            }
+          });
+        this.tbl = twp.tbl;
+        this.partitionPath = twp.partitionPath;
+      }
+      this.isBucketed = tbl.getSd().getNumBuckets() > 0;
+      /**
+       *  For unbucketed tables we have exactly 1 RecrodUpdater for each AbstractRecordWriter which
+       *  ends up writing to a file bucket_000000
+       * See also {@link #getBucket(Object)}
+       */
+      this.totalBuckets = isBucketed ? tbl.getSd().getNumBuckets() : 1;
+      if(isBucketed) {
+        this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols());
+        this.bucketFieldData = new Object[bucketIds.size()];
+      }
+      else {
+        bucketIds = Collections.emptyList();
+      }
+      String outFormatName = this.tbl.getSd().getOutputFormat();
+      outf = (AcidOutputFormat<?, ?>) ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf);
+    } catch(InterruptedException e) {
+      throw new StreamingException(endPoint2.toString(), e);
+    } catch (MetaException | NoSuchObjectException e) {
+      throw new ConnectionError(endPoint2, e);
+    } catch (TException | ClassNotFoundException | IOException e) {
+      throw new StreamingException(e.getMessage(), e);
+    }
+  }
+
+  /**
+   * used to tag error msgs to provied some breadcrumbs
+   */
+  String getWatermark() {
+    return partitionPath + " writeIds[" + curBatchMinWriteId + "," + curBatchMaxWriteId + "]";
+  }
+  // return the column numbers of the bucketed columns
+  private List<Integer> getBucketColIDs(List<String> bucketCols, List<FieldSchema> cols) {
+    ArrayList<Integer> result =  new ArrayList<Integer>(bucketCols.size());
+    HashSet<String> bucketSet = new HashSet<String>(bucketCols);
+    for (int i = 0; i < cols.size(); i++) {
+      if( bucketSet.contains(cols.get(i).getName()) ) {
+        result.add(i);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Get the SerDe for the Objects created by {@link #encode}.  This is public so that test
+   * frameworks can use it.
+   * @return serde
+   * @throws SerializationError
+   */
+  public abstract AbstractSerDe getSerde() throws SerializationError;
+
+  /**
+   * Encode a record as an Object that Hive can read with the ObjectInspector associated with the
+   * serde returned by {@link #getSerde}.  This is public so that test frameworks can use it.
+   * @param record record to be deserialized
+   * @return deserialized record as an Object
+   * @throws SerializationError
+   */
+  public abstract Object encode(byte[] record) throws SerializationError;
+
+  protected abstract ObjectInspector[] getBucketObjectInspectors();
+  protected abstract StructObjectInspector getRecordObjectInspector();
+  protected abstract StructField[] getBucketStructFields();
+
+  // returns the bucket number to which the record belongs to
+  protected int getBucket(Object row) throws SerializationError {
+    if(!isBucketed) {
+      return 0;
+    }
+    ObjectInspector[] inspectors = getBucketObjectInspectors();
+    Object[] bucketFields = getBucketFields(row);
+    return ObjectInspectorUtils.getBucketNumber(bucketFields, inspectors, totalBuckets);
+  }
+
+  @Override
+  public void flush() throws StreamingIOFailure {
+    try {
+      for (RecordUpdater updater : updaters) {
+        if (updater != null) {
+          updater.flush();
+        }
+      }
+    } catch (IOException e) {
+      throw new StreamingIOFailure("Unable to flush recordUpdater", e);
+    }
+  }
+
+  @Override
+  public void clear() throws StreamingIOFailure {
+  }
+
+  /**
+   * Creates a new record updater for the new batch
+   * @param minWriteId smallest writeid in the batch
+   * @param maxWriteID largest writeid in the batch
+   * @throws StreamingIOFailure if failed to create record updater
+   */
+  @Override
+  public void newBatch(Long minWriteId, Long maxWriteID)
+          throws StreamingIOFailure, SerializationError {
+    curBatchMinWriteId = minWriteId;
+    curBatchMaxWriteId = maxWriteID;
+    updaters = new ArrayList<RecordUpdater>(totalBuckets);
+    for (int bucket = 0; bucket < totalBuckets; bucket++) {
+      updaters.add(bucket, null);//so that get(i) returns null rather than ArrayOutOfBounds
+    }
+  }
+
+  @Override
+  public void closeBatch() throws StreamingIOFailure {
+    boolean haveError = false;
+    for (RecordUpdater updater : updaters) {
+      if (updater != null) {
+        try {
+          //try not to leave any files open
+          updater.close(false);
+        } catch (Exception ex) {
+          haveError = true;
+          LOG.error("Unable to close " + updater + " due to: " + ex.getMessage(), ex);
+        }
+      }
+    }
+    updaters.clear();
+    if(haveError) {
+      throw new StreamingIOFailure("Encountered errors while closing (see logs) " + getWatermark());
+    }
+  }
+
+  protected static ObjectInspector[] getObjectInspectorsForBucketedCols(List<Integer> bucketIds
+          , StructObjectInspector recordObjInspector)
+          throws SerializationError {
+    ObjectInspector[] result = new ObjectInspector[bucketIds.size()];
+
+    for (int i = 0; i < bucketIds.size(); i++) {
+      int bucketId = bucketIds.get(i);
+      result[i] =
+              recordObjInspector.getAllStructFieldRefs().get( bucketId ).getFieldObjectInspector();
+    }
+    return result;
+  }
+
+
+  private Object[] getBucketFields(Object row) throws SerializationError {
+    StructObjectInspector recordObjInspector = getRecordObjectInspector();
+    StructField[] bucketStructFields = getBucketStructFields();
+    for (int i = 0; i < bucketIds.size(); i++) {
+      bucketFieldData[i] = recordObjInspector.getStructFieldData(row,  bucketStructFields[i]);
+    }
+    return bucketFieldData;
+  }
+
+  private RecordUpdater createRecordUpdater(int bucketId, Long minWriteId, Long maxWriteID)
+          throws IOException, SerializationError {
+    try {
+      // Initialize table properties from the table parameters. This is required because the table
+      // may define certain table parameters that may be required while writing. The table parameter
+      // 'transactional_properties' is one such example.
+      Properties tblProperties = new Properties();
+      tblProperties.putAll(tbl.getParameters());
+      return  outf.getRecordUpdater(partitionPath,
+              new AcidOutputFormat.Options(conf)
+                      .inspector(getSerde().getObjectInspector())
+                      .bucket(bucketId)
+                      .tableProperties(tblProperties)
+                      .minimumWriteId(minWriteId)
+                      .maximumWriteId(maxWriteID)
+                      .statementId(-1)
+                      .finalDestination(partitionPath));
+    } catch (SerDeException e) {
+      throw new SerializationError("Failed to get object inspector from Serde "
+              + getSerde().getClass().getName(), e);
+    }
+  }
+
+  RecordUpdater getRecordUpdater(int bucketId) throws StreamingIOFailure, SerializationError {
+    RecordUpdater recordUpdater = updaters.get(bucketId);
+    if (recordUpdater == null) {
+      try {
+        recordUpdater = createRecordUpdater(bucketId, curBatchMinWriteId, curBatchMaxWriteId);
+      } catch (IOException e) {
+        String errMsg = "Failed creating RecordUpdater for " + getWatermark();
+        LOG.error(errMsg, e);
+        throw new StreamingIOFailure(errMsg, e);
+      }
+      updaters.set(bucketId, recordUpdater);
+    }
+    return recordUpdater;
+  }
+
+  private Path getPathForEndPoint(IMetaStoreClient msClient, HiveEndPoint endPoint)
+          throws StreamingException {
+    try {
+      String location;
+      if(endPoint.partitionVals==null || endPoint.partitionVals.isEmpty() ) {
+        location = msClient.getTable(endPoint.database,endPoint.table)
+                .getSd().getLocation();
+      } else {
+        location = msClient.getPartition(endPoint.database, endPoint.table,
+                endPoint.partitionVals).getSd().getLocation();
+      }
+      return new Path(location);
+    } catch (TException e) {
+      throw new StreamingException(e.getMessage()
+              + ". Unable to get path for end point: "
+              + endPoint.partitionVals, e);
+    }
+  }
+}
diff --git a/streaming/src/java/org/apache/hive/streaming/ConnectionError.java b/streaming/src/java/org/apache/hive/streaming/ConnectionError.java
new file mode 100644 (file)
index 0000000..668bffb
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class ConnectionError extends StreamingException {
+
+  public ConnectionError(String msg) {
+    super(msg);
+  }
+
+  public ConnectionError(String msg, Exception innerEx) {
+    super(msg, innerEx);
+  }
+
+  public ConnectionError(HiveEndPoint endPoint, Exception innerEx) {
+    super("Error connecting to " + endPoint +
+        (innerEx == null ? "" : ": " + innerEx.getMessage()), innerEx);
+  }
+}
diff --git a/streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java b/streaming/src/java/org/apache/hive/streaming/DelimitedInputWriter.java
new file mode 100644 (file)
index 0000000..898b3f9
--- /dev/null
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.BytesWritable;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Streaming Writer handles delimited input (eg. CSV).
+ * Delimited input is parsed & reordered to match column order in table
+ * Uses Lazy Simple Serde to process delimited input
+ */
+public class DelimitedInputWriter extends AbstractRecordWriter {
+  private final boolean reorderingNeeded;
+  private String delimiter;
+  private char serdeSeparator;
+  private int[] fieldToColMapping;
+  private final ArrayList<String> tableColumns;
+  private LazySimpleSerDe serde = null;
+
+  private final LazySimpleStructObjectInspector recordObjInspector;
+  private final ObjectInspector[] bucketObjInspectors;
+  private final StructField[] bucketStructFields;
+
+  static final private Logger LOG = LoggerFactory.getLogger(DelimitedInputWriter.class.getName());
+
+  /** Constructor. Uses default separator of the LazySimpleSerde
+   * @param colNamesForFields Column name assignment for input fields. nulls or empty
+   *                          strings in the array indicates the fields to be skipped
+   * @param delimiter input field delimiter
+   * @param endPoint Hive endpoint
+   * @throws ConnectionError Problem talking to Hive
+   * @throws ClassNotFoundException Serde class not found
+   * @throws SerializationError Serde initialization/interaction failed
+   * @throws StreamingException Problem acquiring file system path for partition
+   * @throws InvalidColumn any element in colNamesForFields refers to a non existing column
+   */
+  public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+                              HiveEndPoint endPoint, StreamingConnection conn)
+    throws ClassNotFoundException, ConnectionError, SerializationError,
+      InvalidColumn, StreamingException {
+    this(colNamesForFields, delimiter, endPoint, null, conn);
+  }
+ /** Constructor. Uses default separator of the LazySimpleSerde
+  * @param colNamesForFields Column name assignment for input fields. nulls or empty
+  *                          strings in the array indicates the fields to be skipped
+  * @param delimiter input field delimiter
+  * @param endPoint Hive endpoint
+  * @param conf a Hive conf object. Can be null if not using advanced hive settings.
+  * @throws ConnectionError Problem talking to Hive
+  * @throws ClassNotFoundException Serde class not found
+  * @throws SerializationError Serde initialization/interaction failed
+  * @throws StreamingException Problem acquiring file system path for partition
+  * @throws InvalidColumn any element in colNamesForFields refers to a non existing column
+  */
+   public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+                              HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
+          throws ClassNotFoundException, ConnectionError, SerializationError,
+                 InvalidColumn, StreamingException {
+     this(colNamesForFields, delimiter, endPoint, conf,
+       (char) LazySerDeParameters.DefaultSeparators[0], conn);
+   }
+  /**
+   * Constructor. Allows overriding separator of the LazySimpleSerde
+   * @param colNamesForFields Column name assignment for input fields
+   * @param delimiter input field delimiter
+   * @param endPoint Hive endpoint
+   * @param conf a Hive conf object. Set to null if not using advanced hive settings.
+   * @param serdeSeparator separator used when encoding data that is fed into the
+   *                             LazySimpleSerde. Ensure this separator does not occur
+   *                             in the field data
+   * @param conn connection this Writer is to be used with
+   * @throws ConnectionError Problem talking to Hive
+   * @throws ClassNotFoundException Serde class not found
+   * @throws SerializationError Serde initialization/interaction failed
+   * @throws StreamingException Problem acquiring file system path for partition
+   * @throws InvalidColumn any element in colNamesForFields refers to a non existing column
+   */
+  public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+                              HiveEndPoint endPoint, HiveConf conf, char serdeSeparator, StreamingConnection conn)
+          throws ClassNotFoundException, ConnectionError, SerializationError,
+                 InvalidColumn, StreamingException {
+    super(endPoint, conf, conn);
+    this.tableColumns = getCols(tbl);
+    this.serdeSeparator = serdeSeparator;
+    this.delimiter = delimiter;
+    this.fieldToColMapping = getFieldReordering(colNamesForFields, getTableColumns());
+    this.reorderingNeeded = isReorderingNeeded(delimiter, getTableColumns());
+    LOG.debug("Field reordering needed = " + this.reorderingNeeded + ", for endpoint " + endPoint);
+    this.serdeSeparator = serdeSeparator;
+    this.serde = createSerde(tbl, conf, serdeSeparator);
+
+    // get ObjInspectors for entire record and bucketed cols
+    try {
+      this.recordObjInspector = (LazySimpleStructObjectInspector) serde.getObjectInspector();
+      this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector);
+    } catch (SerDeException e) {
+      throw new SerializationError("Unable to get ObjectInspector for bucket columns", e);
+    }
+
+    // get StructFields for bucketed cols
+    bucketStructFields = new StructField[bucketIds.size()];
+    List<? extends StructField> allFields = recordObjInspector.getAllStructFieldRefs();
+    for (int i = 0; i < bucketIds.size(); i++) {
+      bucketStructFields[i] = allFields.get(bucketIds.get(i));
+    }
+  }
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link #DelimitedInputWriter(String[], String, HiveEndPoint, StreamingConnection)}
+   */
+  public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+                              HiveEndPoint endPoint)
+    throws ClassNotFoundException, ConnectionError, SerializationError,
+    InvalidColumn, StreamingException {
+    this(colNamesForFields, delimiter, endPoint, null, null);
+  }
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link #DelimitedInputWriter(String[], String, HiveEndPoint, HiveConf, StreamingConnection)}
+   */
+  public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+                              HiveEndPoint endPoint, HiveConf conf)
+    throws ClassNotFoundException, ConnectionError, SerializationError,
+    InvalidColumn, StreamingException {
+    this(colNamesForFields, delimiter, endPoint, conf,
+      (char) LazySerDeParameters.DefaultSeparators[0], null);
+  }
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link #DelimitedInputWriter(String[], String, HiveEndPoint, HiveConf, char, StreamingConnection)}
+   */
+  public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
+                              HiveEndPoint endPoint, HiveConf conf, char serdeSeparator)
+    throws ClassNotFoundException, StreamingException {
+    this(colNamesForFields, delimiter, endPoint, conf, serdeSeparator, null);
+  }
+
+  private boolean isReorderingNeeded(String delimiter, ArrayList<String> tableColumns) {
+    return !( delimiter.equals(String.valueOf(getSerdeSeparator()))
+            && areFieldsInColOrder(fieldToColMapping)
+            && tableColumns.size()>=fieldToColMapping.length );
+  }
+
+  private static boolean areFieldsInColOrder(int[] fieldToColMapping) {
+    for(int i=0; i<fieldToColMapping.length; ++i) {
+      if(fieldToColMapping[i]!=i) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @VisibleForTesting
+  static int[] getFieldReordering(String[] colNamesForFields, List<String> tableColNames)
+          throws InvalidColumn {
+    int[] result = new int[ colNamesForFields.length ];
+    for(int i=0; i<colNamesForFields.length; ++i) {
+      result[i] = -1;
+    }
+    int i=-1, fieldLabelCount=0;
+    for( String col : colNamesForFields ) {
+      ++i;
+      if(col == null) {
+        continue;
+      }
+      if( col.trim().isEmpty() ) {
+        continue;
+      }
+      ++fieldLabelCount;
+      int loc = tableColNames.indexOf(col);
+      if(loc == -1) {
+        throw new InvalidColumn("Column '" + col + "' not found in table for input field " + i+1);
+      }
+      result[i] = loc;
+    }
+    if(fieldLabelCount>tableColNames.size()) {
+      throw new InvalidColumn("Number of field names exceeds the number of columns in table");
+    }
+    return result;
+  }
+
+  // Reorder fields in record based on the order of columns in the table
+  protected byte[] reorderFields(byte[] record) throws UnsupportedEncodingException {
+    if(!reorderingNeeded) {
+      return record;
+    }
+    String[] reorderedFields = new String[getTableColumns().size()];
+    String decoded = new String(record);
+    String[] fields = decoded.split(delimiter,-1);
+    for (int i=0; i<fieldToColMapping.length; ++i) {
+      int newIndex = fieldToColMapping[i];
+      if(newIndex != -1) {
+        reorderedFields[newIndex] = fields[i];
+      }
+    }
+    return join(reorderedFields, getSerdeSeparator());
+  }
+
+  // handles nulls in items[]
+  // TODO: perhaps can be made more efficient by creating a byte[] directly
+  private static byte[] join(String[] items, char separator) {
+    StringBuilder buff = new StringBuilder(100);
+    if(items.length == 0)
+      return "".getBytes();
+    int i=0;
+    for(; i<items.length-1; ++i) {
+      if(items[i]!=null) {
+        buff.append(items[i]);
+      }
+      buff.append(separator);
+    }
+    if(items[i]!=null) {
+      buff.append(items[i]);
+    }
+    return buff.toString().getBytes();
+  }
+
+  protected ArrayList<String> getTableColumns() {
+    return tableColumns;
+  }
+
+  @Override
+  public void write(long writeId, byte[] record)
+          throws SerializationError, StreamingIOFailure {
+    try {
+      byte[] orderedFields = reorderFields(record);
+      Object encodedRow = encode(orderedFields);
+      int bucket = getBucket(encodedRow);
+      getRecordUpdater(bucket).insert(writeId, encodedRow);
+    } catch (IOException e) {
+      throw new StreamingIOFailure("Error writing record in transaction write id ("
+              + writeId + ")", e);
+    }
+  }
+
+  @Override
+  public AbstractSerDe getSerde() {
+    return serde;
+  }
+
+  protected LazySimpleStructObjectInspector getRecordObjectInspector() {
+    return recordObjInspector;
+  }
+
+  @Override
+  protected StructField[] getBucketStructFields() {
+    return bucketStructFields;
+  }
+
+  protected ObjectInspector[] getBucketObjectInspectors() {
+    return bucketObjInspectors;
+  }
+
+  @Override
+  public Object encode(byte[] record) throws SerializationError {
+    try {
+      BytesWritable blob = new BytesWritable();
+      blob.set(record, 0, record.length);
+      return serde.deserialize(blob);
+    } catch (SerDeException e) {
+      throw new SerializationError("Unable to convert byte[] record into Object", e);
+    }
+  }
+
+  /**
+   * Creates LazySimpleSerde
+   * @return
+   * @throws SerializationError if serde could not be initialized
+   * @param tbl
+   */
+  protected static LazySimpleSerDe createSerde(Table tbl, HiveConf conf, char serdeSeparator)
+          throws SerializationError {
+    try {
+      Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
+      tableProps.setProperty("field.delim", String.valueOf(serdeSeparator));
+      LazySimpleSerDe serde = new LazySimpleSerDe();
+      SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
+      return serde;
+    } catch (SerDeException e) {
+      throw new SerializationError("Error initializing serde", e);
+    }
+  }
+
+  private ArrayList<String> getCols(Table table) {
+    List<FieldSchema> cols = table.getSd().getCols();
+    ArrayList<String> colNames = new ArrayList<String>(cols.size());
+    for (FieldSchema col : cols) {
+      colNames.add(col.getName().toLowerCase());
+    }
+    return  colNames;
+  }
+
+  public char getSerdeSeparator() {
+    return serdeSeparator;
+  }
+}
diff --git a/streaming/src/java/org/apache/hive/streaming/HeartBeatFailure.java b/streaming/src/java/org/apache/hive/streaming/HeartBeatFailure.java
new file mode 100644 (file)
index 0000000..b1f9520
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import java.util.Collection;
+import java.util.Set;
+
+public class HeartBeatFailure extends StreamingException {
+  private Collection<Long> abortedTxns;
+  private Collection<Long> nosuchTxns;
+
+  public HeartBeatFailure(Collection<Long> abortedTxns, Set<Long> nosuchTxns) {
+    super("Heart beat error. InvalidTxns: " + nosuchTxns + ". AbortedTxns: " + abortedTxns);
+    this.abortedTxns = abortedTxns;
+    this.nosuchTxns = nosuchTxns;
+  }
+}
diff --git a/streaming/src/java/org/apache/hive/streaming/HiveEndPoint.java b/streaming/src/java/org/apache/hive/streaming/HiveEndPoint.java
new file mode 100644 (file)
index 0000000..b04e137
--- /dev/null
@@ -0,0 +1,1117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.LockRequestBuilder;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.apache.hadoop.hive.ql.DriverFactory;
+import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.hcatalog.common.HCatUtil;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Information about the hive end point (i.e. table or partition) to write to.
+ * A light weight object that does NOT internally hold on to resources such as
+ * network connections. It can be stored in Hashed containers such as sets and hash tables.
+ */
+public class HiveEndPoint {
+  public final String metaStoreUri;
+  public final String database;
+  public final String table;
+  public final ArrayList<String> partitionVals;
+
+
+  static final private Logger LOG = LoggerFactory.getLogger(HiveEndPoint.class.getName());
+
+  /**
+   *
+   * @param metaStoreUri   URI of the metastore to connect to eg: thrift://localhost:9083
+   * @param database       Name of the Hive database
+   * @param table          Name of table to stream to
+   * @param partitionVals  Indicates the specific partition to stream to. Can be null or empty List
+   *                       if streaming to a table without partitions. The order of values in this
+   *                       list must correspond exactly to the order of partition columns specified
+   *                       during the table creation. E.g. For a table partitioned by
+   *                       (continent string, country string), partitionVals could be the list
+   *                       ("Asia", "India").
+   */
+  public HiveEndPoint(String metaStoreUri
+          , String database, String table, List<String> partitionVals) {
+    this.metaStoreUri = metaStoreUri;
+    if (database==null) {
+      throw new IllegalArgumentException("Database cannot be null for HiveEndPoint");
+    }
+    this.database = database;
+    this.table = table;
+    if (table==null) {
+      throw new IllegalArgumentException("Table cannot be null for HiveEndPoint");
+    }
+    this.partitionVals = partitionVals==null ? new ArrayList<String>()
+                                             : new ArrayList<String>( partitionVals );
+  }
+
+
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link #newConnection(boolean, String)}
+   */
+  @Deprecated
+  public StreamingConnection newConnection(final boolean createPartIfNotExists)
+    throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
+    , ImpersonationFailed , InterruptedException {
+    return newConnection(createPartIfNotExists, null, null, null);
+  }
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link #newConnection(boolean, HiveConf, String)}
+   */
+  @Deprecated
+  public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf)
+    throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
+    , ImpersonationFailed , InterruptedException {
+    return newConnection(createPartIfNotExists, conf, null, null);
+  }
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link #newConnection(boolean, HiveConf, UserGroupInformation, String)}
+   */
+  @Deprecated
+  public StreamingConnection newConnection(final boolean createPartIfNotExists, final HiveConf conf,
+                                           final UserGroupInformation authenticatedUser)
+    throws ConnectionError, InvalidPartition,
+    InvalidTable, PartitionCreationFailed, ImpersonationFailed , InterruptedException {
+    return newConnection(createPartIfNotExists, conf, authenticatedUser, null);
+  }
+  /**
+   * Acquire a new connection to MetaStore for streaming
+   * @param createPartIfNotExists If true, the partition specified in the endpoint
+   *                              will be auto created if it does not exist
+   * @param agentInfo should uniquely identify the process/entity that is using this batch.  This
+   *                  should be something that can be correlated with calling application log files
+   *                  and/or monitoring consoles.
+   * @return
+   * @throws ConnectionError if problem connecting
+   * @throws InvalidPartition  if specified partition is not valid (createPartIfNotExists = false)
+   * @throws ImpersonationFailed  if not able to impersonate 'proxyUser'
+   * @throws PartitionCreationFailed if failed to create partition
+   * @throws InterruptedException
+   */
+  public StreamingConnection newConnection(final boolean createPartIfNotExists, String agentInfo)
+    throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
+    , ImpersonationFailed , InterruptedException {
+    return newConnection(createPartIfNotExists, null, null, agentInfo);
+  }
+
+  /**
+   * Acquire a new connection to MetaStore for streaming
+   * @param createPartIfNotExists If true, the partition specified in the endpoint
+   *                              will be auto created if it does not exist
+   * @param conf HiveConf object, set it to null if not using advanced hive settings.
+   * @param agentInfo should uniquely identify the process/entity that is using this batch.  This
+   *                  should be something that can be correlated with calling application log files
+   *                  and/or monitoring consoles.
+   * @return
+   * @throws ConnectionError if problem connecting
+   * @throws InvalidPartition  if specified partition is not valid (createPartIfNotExists = false)
+   * @throws ImpersonationFailed  if not able to impersonate 'proxyUser'
+   * @throws PartitionCreationFailed if failed to create partition
+   * @throws InterruptedException
+   */
+  public StreamingConnection newConnection(final boolean createPartIfNotExists, HiveConf conf, String agentInfo)
+          throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed
+          , ImpersonationFailed , InterruptedException {
+    return newConnection(createPartIfNotExists, conf, null, agentInfo);
+  }
+
+  /**
+   * Acquire a new connection to MetaStore for streaming. To connect using Kerberos,
+   *   'authenticatedUser' argument should have been used to do a kerberos login.  Additionally the
+   *   'hive.metastore.kerberos.principal' setting should be set correctly either in hive-site.xml or
+   *    in the 'conf' argument (if not null). If using hive-site.xml, it should be in classpath.
+   *
+   * @param createPartIfNotExists If true, the partition specified in the endpoint
+   *                              will be auto created if it does not exist
+   * @param conf               HiveConf object to be used for the connection. Can be null.
+   * @param authenticatedUser  UserGroupInformation object obtained from successful authentication.
+   *                           Uses non-secure mode if this argument is null.
+   * @param agentInfo should uniquely identify the process/entity that is using this batch.  This
+   *                  should be something that can be correlated with calling application log files
+   *                  and/or monitoring consoles.
+   * @return
+   * @throws ConnectionError if there is a connection problem
+   * @throws InvalidPartition  if specified partition is not valid (createPartIfNotExists = false)
+   * @throws ImpersonationFailed  if not able to impersonate 'username'
+   * @throws PartitionCreationFailed if failed to create partition
+   * @throws InterruptedException
+   */
+  public StreamingConnection newConnection(final boolean createPartIfNotExists, final HiveConf conf,
+                                           final UserGroupInformation authenticatedUser, final String agentInfo)
+          throws ConnectionError, InvalidPartition,
+               InvalidTable, PartitionCreationFailed, ImpersonationFailed , InterruptedException {
+
+    if( authenticatedUser==null ) {
+      return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf, agentInfo);
+    }
+
+    try {
+      return authenticatedUser.doAs (
+             new PrivilegedExceptionAction<StreamingConnection>() {
+                @Override
+                public StreamingConnection run()
+                        throws ConnectionError, InvalidPartition, InvalidTable
+                        , PartitionCreationFailed {
+                  return newConnectionImpl(authenticatedUser, createPartIfNotExists, conf, agentInfo);
+                }
+             }
+      );
+    } catch (IOException e) {
+      throw new ConnectionError("Failed to connect as : " + authenticatedUser.getShortUserName(), e);
+    }
+  }
+
+  private StreamingConnection newConnectionImpl(UserGroupInformation ugi,
+                                               boolean createPartIfNotExists, HiveConf conf, String agentInfo)
+          throws ConnectionError, InvalidPartition, InvalidTable
+          , PartitionCreationFailed {
+    return new ConnectionImpl(this, ugi, conf, createPartIfNotExists, agentInfo);
+  }
+
+  private static UserGroupInformation getUserGroupInfo(String user)
+          throws ImpersonationFailed {
+    try {
+      return UserGroupInformation.createProxyUser(
+              user, UserGroupInformation.getLoginUser());
+    } catch (IOException e) {
+      LOG.error("Unable to get UserGroupInfo for user : " + user, e);
+      throw new ImpersonationFailed(user,e);
+    }
+  }
+
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    HiveEndPoint endPoint = (HiveEndPoint) o;
+
+    if (database != null
+            ? !database.equals(endPoint.database)
+            : endPoint.database != null ) {
+      return false;
+    }
+    if (metaStoreUri != null
+            ? !metaStoreUri.equals(endPoint.metaStoreUri)
+            : endPoint.metaStoreUri != null ) {
+      return false;
+    }
+    if (!partitionVals.equals(endPoint.partitionVals)) {
+      return false;
+    }
+    if (table != null ? !table.equals(endPoint.table) : endPoint.table != null) {
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = metaStoreUri != null ? metaStoreUri.hashCode() : 0;
+    result = 31 * result + (database != null ? database.hashCode() : 0);
+    result = 31 * result + (table != null ? table.hashCode() : 0);
+    result = 31 * result + partitionVals.hashCode();
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "{" +
+            "metaStoreUri='" + metaStoreUri + '\'' +
+            ", database='" + database + '\'' +
+            ", table='" + table + '\'' +
+            ", partitionVals=" + partitionVals + " }";
+  }
+
+
+  private static class ConnectionImpl implements StreamingConnection {
+    private final IMetaStoreClient msClient;
+    private final IMetaStoreClient heartbeaterMSClient;
+    private final HiveEndPoint endPt;
+    private final UserGroupInformation ugi;
+    private final String username;
+    private final boolean secureMode;
+    private final String agentInfo;
+
+    /**
+     * @param endPoint end point to connect to
+     * @param ugi on behalf of whom streaming is done. cannot be null
+     * @param conf HiveConf object
+     * @param createPart create the partition if it does not exist
+     * @throws ConnectionError if there is trouble connecting
+     * @throws InvalidPartition if specified partition does not exist (and createPart=false)
+     * @throws InvalidTable if specified table does not exist
+     * @throws PartitionCreationFailed if createPart=true and not able to create partition
+     */
+    private ConnectionImpl(HiveEndPoint endPoint, UserGroupInformation ugi,
+                           HiveConf conf, boolean createPart, String agentInfo)
+            throws ConnectionError, InvalidPartition, InvalidTable
+                   , PartitionCreationFailed {
+      this.endPt = endPoint;
+      this.ugi = ugi;
+      this.agentInfo = agentInfo;
+      this.username = ugi==null ? System.getProperty("user.name") : ugi.getShortUserName();
+      if (conf==null) {
+        conf = HiveEndPoint.createHiveConf(this.getClass(), endPoint.metaStoreUri);
+      }
+      else {
+          overrideConfSettings(conf);
+      }
+      this.secureMode = ugi==null ? false : ugi.hasKerberosCredentials();
+      this.msClient = getMetaStoreClient(endPoint, conf, secureMode);
+      // We use a separate metastore client for heartbeat calls to ensure heartbeat RPC calls are
+      // isolated from the other transaction related RPC calls.
+      this.heartbeaterMSClient = getMetaStoreClient(endPoint, conf, secureMode);
+      checkEndPoint(endPoint, msClient);
+      if (createPart  &&  !endPoint.partitionVals.isEmpty()) {
+        createPartitionIfNotExists(endPoint, msClient, conf);
+      }
+    }
+
+    /**
+     * Checks the validity of endpoint
+     * @param endPoint the HiveEndPoint to be checked
+     * @param msClient the metastore client
+     * @throws InvalidTable
+     */
+    private void checkEndPoint(HiveEndPoint endPoint, IMetaStoreClient msClient)
+        throws InvalidTable, ConnectionError {
+      Table t;
+      try {
+        t = msClient.getTable(endPoint.database, endPoint.table);
+      } catch (Exception e) {
+        LOG.warn("Unable to check the endPoint: " + endPoint, e);
+        throw new InvalidTable(endPoint.database, endPoint.table, e);
+      }
+      // 1 - check that the table is Acid
+      if (!AcidUtils.isFullAcidTable(t)) {
+        LOG.error("HiveEndPoint " + endPoint + " must use an acid table");
+        throw new InvalidTable(endPoint.database, endPoint.table, "is not an Acid table");
+      }
+
+      // 2 - check if partitionvals are legitimate
+      if (t.getPartitionKeys() != null && !t.getPartitionKeys().isEmpty()
+          && endPoint.partitionVals.isEmpty()) {
+        // Invalid if table is partitioned, but endPoint's partitionVals is empty
+        String errMsg = "HiveEndPoint " + endPoint + " doesn't specify any partitions for " +
+            "partitioned table";
+        LOG.error(errMsg);
+        throw new ConnectionError(errMsg);
+      }
+      if ((t.getPartitionKeys() == null || t.getPartitionKeys().isEmpty())
+          && !endPoint.partitionVals.isEmpty()) {
+        // Invalid if table is not partitioned, but endPoint's partitionVals is not empty
+        String errMsg = "HiveEndPoint" + endPoint + " specifies partitions for unpartitioned table";
+        LOG.error(errMsg);
+        throw new ConnectionError(errMsg);
+      }
+    }
+
+    /**
+     * Close connection
+     */
+    @Override
+    public void close() {
+      if (ugi==null) {
+        msClient.close();
+        heartbeaterMSClient.close();
+        return;
+      }
+      try {
+        ugi.doAs (
+            new PrivilegedExceptionAction<Void>() {
+              @Override
+              public Void run() throws Exception {
+                msClient.close();
+                heartbeaterMSClient.close();
+                return null;
+              }
+            } );
+        try {
+          FileSystem.closeAllForUGI(ugi);
+        } catch (IOException exception) {
+          LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception);
+        }
+      } catch (IOException e) {
+        LOG.error("Error closing connection to " + endPt, e);
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted when closing connection to " + endPt, e);
+      }
+    }
+
+    @Override
+    public UserGroupInformation getUserGroupInformation() {
+      return ugi;
+    }
+
+    /**
+     * Acquires a new batch of transactions from Hive.
+     *
+     * @param numTransactions is a hint from client indicating how many transactions client needs.
+     * @param recordWriter  Used to write record. The same writer instance can
+     *                      be shared with another TransactionBatch (to the same endpoint)
+     *                      only after the first TransactionBatch has been closed.
+     *                      Writer will be closed when the TransactionBatch is closed.
+     * @return
+     * @throws StreamingIOFailure if failed to create new RecordUpdater for batch
+     * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch
+     * @throws ImpersonationFailed failed to run command as proxyUser
+     * @throws InterruptedException
+     */
+    @Override
+    public TransactionBatch fetchTransactionBatch(final int numTransactions,
+                                                      final RecordWriter recordWriter)
+            throws StreamingException, TransactionBatchUnAvailable, ImpersonationFailed
+                  , InterruptedException {
+      if (ugi==null) {
+        return fetchTransactionBatchImpl(numTransactions, recordWriter);
+      }
+      try {
+        return ugi.doAs (
+                new PrivilegedExceptionAction<TransactionBatch>() {
+                  @Override
+                  public TransactionBatch run() throws StreamingException, InterruptedException {
+                    return fetchTransactionBatchImpl(numTransactions, recordWriter);
+                  }
+                }
+        );
+      } catch (IOException e) {
+        throw new ImpersonationFailed("Failed to fetch Txn Batch as user '" + ugi.getShortUserName()
+                + "' when acquiring Transaction Batch on endPoint " + endPt, e);
+      }
+    }
+
+    private TransactionBatch fetchTransactionBatchImpl(int numTransactions,
+                                                  RecordWriter recordWriter)
+            throws StreamingException, TransactionBatchUnAvailable, InterruptedException {
+      return new TransactionBatchImpl(username, ugi, endPt, numTransactions, msClient,
+          heartbeaterMSClient, recordWriter, agentInfo);
+    }
+
+
+    private static void createPartitionIfNotExists(HiveEndPoint ep,
+                                                   IMetaStoreClient msClient, HiveConf conf)
+            throws InvalidTable, PartitionCreationFailed {
+      if (ep.partitionVals.isEmpty()) {
+        return;
+      }
+      SessionState localSession = null;
+      if(SessionState.get() == null) {
+        localSession = SessionState.start(new CliSessionState(conf));
+      }
+      IDriver driver = DriverFactory.newDriver(conf);
+
+      try {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Attempting to create partition (if not existent) " + ep);
+        }
+
+        List<FieldSchema> partKeys = msClient.getTable(ep.database, ep.table)
+                .getPartitionKeys();
+        runDDL(driver, "use " + ep.database);
+        String query = "alter table " + ep.table + " add if not exists partition "
+                + partSpecStr(partKeys, ep.partitionVals);
+        runDDL(driver, query);
+      } catch (MetaException e) {
+        LOG.error("Failed to create partition : " + ep, e);
+        throw new PartitionCreationFailed(ep, e);
+      } catch (NoSuchObjectException e) {
+        LOG.error("Failed to create partition : " + ep, e);
+        throw new InvalidTable(ep.database, ep.table);
+      } catch (TException e) {
+        LOG.error("Failed to create partition : " + ep, e);
+        throw new PartitionCreationFailed(ep, e);
+      } catch (QueryFailedException e) {
+        LOG.error("Failed to create partition : " + ep, e);
+        throw new PartitionCreationFailed(ep, e);
+      } finally {
+        driver.close();
+        try {
+          if(localSession != null) {
+            localSession.close();
+          }
+        } catch (IOException e) {
+          LOG.warn("Error closing SessionState used to run Hive DDL.");
+        }
+      }
+    }
+
+    private static boolean runDDL(IDriver driver, String sql) throws QueryFailedException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Running Hive Query: " + sql);
+      }
+      driver.run(sql);
+      return true;
+    }
+
+    private static String partSpecStr(List<FieldSchema> partKeys, ArrayList<String> partVals) {
+      if (partKeys.size()!=partVals.size()) {
+        throw new IllegalArgumentException("Partition values:" + partVals +
+                ", does not match the partition Keys in table :" + partKeys );
+      }
+      StringBuilder buff = new StringBuilder(partKeys.size()*20);
+      buff.append(" ( ");
+      int i=0;
+      for (FieldSchema schema : partKeys) {
+        buff.append(schema.getName());
+        buff.append("='");
+        buff.append(partVals.get(i));
+        buff.append("'");
+        if (i!=partKeys.size()-1) {
+          buff.append(",");
+        }
+        ++i;
+      }
+      buff.append(" )");
+      return buff.toString();
+    }
+
+    private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveConf conf, boolean secureMode)
+            throws ConnectionError {
+
+      if (endPoint.metaStoreUri!= null) {
+        conf.setVar(HiveConf.ConfVars.METASTOREURIS, endPoint.metaStoreUri);
+      }
+      if(secureMode) {
+        conf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL,true);
+      }
+      try {
+        return HCatUtil.getHiveMetastoreClient(conf);
+      } catch (MetaException e) {
+        throw new ConnectionError("Error connecting to Hive Metastore URI: "
+                + endPoint.metaStoreUri + ". " + e.getMessage(), e);
+      } catch (IOException e) {
+        throw new ConnectionError("Error connecting to Hive Metastore URI: "
+            + endPoint.metaStoreUri + ". " + e.getMessage(), e);
+      }
+    }
+  } // class ConnectionImpl
+
+  private static class TransactionBatchImpl implements TransactionBatch {
+    private final String username;
+    private final UserGroupInformation ugi;
+    private final HiveEndPoint endPt;
+    private final IMetaStoreClient msClient;
+    private final IMetaStoreClient heartbeaterMSClient;
+    private final RecordWriter recordWriter;
+    private final List<TxnToWriteId> txnToWriteIds;
+
+    //volatile because heartbeat() may be in a "different" thread; updates of this are "piggybacking"
+    private volatile int currentTxnIndex = -1;
+    private final String partNameForLock;
+    //volatile because heartbeat() may be in a "different" thread
+    private volatile TxnState state;
+    private LockRequest lockRequest = null;
+    /**
+     * once any operation on this batch encounters a system exception
+     * (e.g. IOException on write) it's safest to assume that we can't write to the
+     * file backing this batch any more.  This guards important public methods
+     */
+    private volatile boolean isClosed = false;
+    private final String agentInfo;
+    /**
+     * Tracks the state of each transaction
+     */
+    private final TxnState[] txnStatus;
+    /**
+     * ID of the last txn used by {@link #beginNextTransactionImpl()}
+     */
+    private long lastTxnUsed;
+
+    /**
+     * Represents a batch of transactions acquired from MetaStore
+     *
+     * @throws StreamingException if failed to create new RecordUpdater for batch
+     * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch
+     */
+    private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEndPoint endPt,
+        final int numTxns, final IMetaStoreClient msClient,
+        final IMetaStoreClient heartbeaterMSClient, RecordWriter recordWriter, String agentInfo)
+        throws StreamingException, TransactionBatchUnAvailable, InterruptedException {
+      boolean success = false;
+      try {
+        if ( endPt.partitionVals!=null   &&   !endPt.partitionVals.isEmpty() ) {
+          Table tableObj = msClient.getTable(endPt.database, endPt.table);
+          List<FieldSchema> partKeys = tableObj.getPartitionKeys();
+          partNameForLock = Warehouse.makePartName(partKeys, endPt.partitionVals);
+        } else {
+          partNameForLock = null;
+        }
+        this.username = user;
+        this.ugi = ugi;
+        this.endPt = endPt;
+        this.msClient = msClient;
+        this.heartbeaterMSClient = heartbeaterMSClient;
+        this.recordWriter = recordWriter;
+        this.agentInfo = agentInfo;
+
+        List<Long> txnIds = openTxnImpl(msClient, user, numTxns, ugi);
+        txnToWriteIds = allocateWriteIdsImpl(msClient, txnIds, ugi);
+        assert(txnToWriteIds.size() == numTxns);
+
+        txnStatus = new TxnState[numTxns];
+        for(int i = 0; i < txnStatus.length; i++) {
+          assert(txnToWriteIds.get(i).getTxnId() == txnIds.get(i));
+          txnStatus[i] = TxnState.OPEN;//Open matches Metastore state
+        }
+        this.state = TxnState.INACTIVE;
+
+        // The Write Ids returned for the transaction batch is also sequential
+        recordWriter.newBatch(txnToWriteIds.get(0).getWriteId(), txnToWriteIds.get(numTxns-1).getWriteId());
+        success = true;
+      } catch (TException e) {
+        throw new TransactionBatchUnAvailable(endPt, e);
+      } catch (IOException e) {
+        throw new TransactionBatchUnAvailable(endPt, e);
+      }
+      finally {
+        //clean up if above throws
+        markDead(success);
+      }
+    }
+
+    private List<Long> openTxnImpl(final IMetaStoreClient msClient, final String user, final int numTxns, UserGroupInformation ugi)
+            throws IOException, TException,  InterruptedException {
+      if(ugi==null) {
+        return  msClient.openTxns(user, numTxns).getTxn_ids();
+      }
+      return (List<Long>) ugi.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws Exception {
+          return msClient.openTxns(user, numTxns).getTxn_ids();
+        }
+      });
+    }
+
+    private List<TxnToWriteId> allocateWriteIdsImpl(final IMetaStoreClient msClient,
+                                                    final List<Long> txnIds, UserGroupInformation ugi)
+            throws IOException, TException,  InterruptedException {
+      if(ugi==null) {
+        return  msClient.allocateTableWriteIdsBatch(txnIds, endPt.database, endPt.table);
+      }
+      return (List<TxnToWriteId>) ugi.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws Exception {
+          return msClient.allocateTableWriteIdsBatch(txnIds, endPt.database, endPt.table);
+        }
+      });
+    }
+
+    @Override
+    public String toString() {
+      if (txnToWriteIds==null || txnToWriteIds.isEmpty()) {
+        return "{}";
+      }
+      StringBuilder sb = new StringBuilder(" TxnStatus[");
+      for(TxnState state : txnStatus) {
+        //'state' should not be null - future proofing
+        sb.append(state == null ? "N" : state);
+      }
+      sb.append("] LastUsed ").append(JavaUtils.txnIdToString(lastTxnUsed));
+      return "TxnId/WriteIds=[" + txnToWriteIds.get(0).getTxnId()
+              + "/" + txnToWriteIds.get(0).getWriteId()
+              + "..."
+              + txnToWriteIds.get(txnToWriteIds.size()-1).getTxnId()
+              + "/" + txnToWriteIds.get(txnToWriteIds.size()-1).getWriteId()
+              + "] on endPoint = " + endPt + "; " + sb;
+    }
+
+    /**
+     * Activate the next available transaction in the current transaction batch
+     * @throws TransactionError failed to switch to next transaction
+     */
+    @Override
+    public void beginNextTransaction() throws TransactionError, ImpersonationFailed,
+            InterruptedException {
+      checkIsClosed();
+      if (ugi==null) {
+        beginNextTransactionImpl();
+        return;
+      }
+      try {
+        ugi.doAs (
+              new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws TransactionError {
+                  beginNextTransactionImpl();
+                  return null;
+                }
+              }
+        );
+      } catch (IOException e) {
+        throw new ImpersonationFailed("Failed switching to next Txn as user '" + username +
+                "' in Txn batch :" + this, e);
+      }
+    }
+
+    private void beginNextTransactionImpl() throws TransactionError {
+      state = TxnState.INACTIVE;//clear state from previous txn
+
+      if ((currentTxnIndex + 1) >= txnToWriteIds.size()) {
+        throw new InvalidTrasactionState("No more transactions available in" +
+                " current batch for end point : " + endPt);
+      }
+      ++currentTxnIndex;
+      state = TxnState.OPEN;
+      lastTxnUsed = getCurrentTxnId();
+      lockRequest = createLockRequest(endPt, partNameForLock, username, getCurrentTxnId(), agentInfo);
+      try {
+        LockResponse res = msClient.lock(lockRequest);
+        if (res.getState() != LockState.ACQUIRED) {
+          throw new TransactionError("Unable to acquire lock on " + endPt);
+        }
+      } catch (TException e) {
+        throw new TransactionError("Unable to acquire lock on " + endPt, e);
+      }
+    }
+
+    /**
+     * Get Id of currently open transaction.
+     * @return -1 if there is no open TX
+     */
+    @Override
+    public Long getCurrentTxnId() {
+      if (currentTxnIndex >= 0) {
+        return txnToWriteIds.get(currentTxnIndex).getTxnId();
+      }
+      return -1L;
+    }
+
+    /**
+     * Get Id of currently open transaction.
+     * @return -1 if there is no open TX
+     */
+    @Override
+    public Long getCurrentWriteId() {
+      if (currentTxnIndex >= 0) {
+        return txnToWriteIds.get(currentTxnIndex).getWriteId();
+      }
+      return -1L;
+    }
+
+    /**
+     * get state of current transaction
+     * @return
+     */
+    @Override
+    public TxnState getCurrentTransactionState() {
+      return state;
+    }
+
+    /**
+     * Remaining transactions are the ones that are not committed or aborted or active.
+     * Active transaction is not considered part of remaining txns.
+     * @return number of transactions remaining this batch.
+     */
+    @Override
+    public int remainingTransactions() {
+      if (currentTxnIndex>=0) {
+        return txnToWriteIds.size() - currentTxnIndex -1;
+      }
+      return txnToWriteIds.size();
+    }
+
+
+    /**
+     *  Write record using RecordWriter
+     * @param record  the data to be written
+     * @throws StreamingIOFailure I/O failure
+     * @throws SerializationError  serialization error
+     * @throws ImpersonationFailed error writing on behalf of proxyUser
+     * @throws InterruptedException
+     */
+    @Override
+    public void write(final byte[] record)
+            throws StreamingException, InterruptedException {
+      write(Collections.singletonList(record));
+    }
+    private void checkIsClosed() throws IllegalStateException {
+      if(isClosed) {
+        throw new IllegalStateException("TransactionBatch " + toString() + " has been closed()");
+      }
+    }
+    /**
+     * A transaction batch opens a single HDFS file and writes multiple transaction to it.  If there is any issue
+     * with the write, we can't continue to write to the same file any as it may be corrupted now (at the tail).
+     * This ensures that a client can't ignore these failures and continue to write.
+     */
+    private void markDead(boolean success) {
+      if(success) {
+        return;
+      }
+      isClosed = true;//also ensures that heartbeat() is no-op since client is likely doing it async
+      try {
+        abort(true);//abort all remaining txns
+      }
+      catch(Exception ex) {
+        LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
+      }
+      try {
+        closeImpl();
+      }
+      catch (Exception ex) {
+        LOG.error("Fatal error on " + toString() + "; cause " + ex.getMessage(), ex);
+      }
+    }
+
+
+    /**
+     *  Write records using RecordWriter
+     * @param records collection of rows to be written
+     * @throws StreamingException  serialization error
+     * @throws ImpersonationFailed error writing on behalf of proxyUser
+     * @throws InterruptedException
+     */
+    @Override
+    public void write(final Collection<byte[]> records)
+            throws StreamingException, InterruptedException,
+            ImpersonationFailed {
+      checkIsClosed();
+      boolean success = false;
+      try {
+        if (ugi == null) {
+          writeImpl(records);
+        } else {
+          ugi.doAs(
+            new PrivilegedExceptionAction<Void>() {
+              @Override
+              public Void run() throws StreamingException {
+                writeImpl(records);
+                return null;
+              }
+            }
+          );
+        }
+        success = true;
+      } catch(SerializationError ex) {
+        //this exception indicates that a {@code record} could not be parsed and the
+        //caller can decide whether to drop it or send it to dead letter queue.
+        //rolling back the txn and retrying won't help since the tuple will be exactly the same
+        //when it's replayed.
+        success = true;
+        throw ex;
+      } catch(IOException e){
+        throw new ImpersonationFailed("Failed writing as user '" + username +
+          "' to endPoint :" + endPt + ". Transaction Id: "
+          + getCurrentTxnId(), e);
+      }
+      finally {
+        markDead(success);
+      }
+    }
+
+    private void writeImpl(Collection<byte[]> records)
+            throws StreamingException {
+      for (byte[] record : records) {
+        recordWriter.write(getCurrentWriteId(), record);
+      }
+    }
+
+
+    /**
+     * Commit the currently open transaction
+     * @throws TransactionError
+     * @throws StreamingIOFailure  if flushing records failed
+     * @throws ImpersonationFailed if
+     * @throws InterruptedException
+     */
+    @Override
+    public void commit()  throws TransactionError, StreamingException,
+           ImpersonationFailed, InterruptedException {
+      checkIsClosed();
+      boolean success = false;
+      try {
+        if (ugi == null) {
+          commitImpl();
+        }
+        else {
+          ugi.doAs(
+            new PrivilegedExceptionAction<Void>() {
+              @Override
+              public Void run() throws StreamingException {
+                commitImpl();
+                return null;
+              }
+            }
+          );
+        }
+        success = true;
+      } catch (IOException e) {
+        throw new ImpersonationFailed("Failed committing Txn ID " + getCurrentTxnId() + " as user '"
+                + username + "'on endPoint :" + endPt + ". Transaction Id: ", e);
+      }
+      finally {
+        markDead(success);
+      }
+    }
+
+    private void commitImpl() throws TransactionError, StreamingException {
+      try {
+        recordWriter.flush();
+        msClient.commitTxn(txnToWriteIds.get(currentTxnIndex).getTxnId());
+        state = TxnState.COMMITTED;
+        txnStatus[currentTxnIndex] = TxnState.COMMITTED;
+      } catch (NoSuchTxnException e) {
+        throw new TransactionError("Invalid transaction id : "
+                + getCurrentTxnId(), e);
+      } catch (TxnAbortedException e) {
+        throw new TransactionError("Aborted transaction cannot be committed"
+                , e);
+      } catch (TException e) {
+        throw new TransactionError("Unable to commit transaction"
+                + getCurrentTxnId(), e);
+      }
+    }
+
+    /**
+     * Abort the currently open transaction
+     * @throws TransactionError
+     */
+    @Override
+    public void abort() throws TransactionError, StreamingException
+                      , ImpersonationFailed, InterruptedException {
+      if(isClosed) {
+        /**
+         * isDead is only set internally by this class.  {@link #markDead(boolean)} will abort all
+         * remaining txns, so make this no-op to make sure that a well-behaved client that calls abort()
+         * error doesn't get misleading errors
+         */
+        return;
+      }
+      abort(false);
+    }
+    private void abort(final boolean abortAllRemaining) throws TransactionError, StreamingException
+        , ImpersonationFailed, InterruptedException {
+      if (ugi==null) {
+        abortImpl(abortAllRemaining);
+        return;
+      }
+      try {
+        ugi.doAs (
+                new PrivilegedExceptionAction<Void>() {
+                  @Override
+                  public Void run() throws StreamingException {
+                    abortImpl(abortAllRemaining);
+                    return null;
+                  }
+                }
+        );
+      } catch (IOException e) {
+        throw new ImpersonationFailed("Failed aborting Txn " + getCurrentTxnId()  + " as user '"
+                + username + "' on endPoint :" + endPt, e);
+      }
+    }
+
+    private void abortImpl(boolean abortAllRemaining) throws TransactionError, StreamingException {
+      try {
+        if(abortAllRemaining) {
+          //when last txn finished (abort/commit) the currentTxnIndex is pointing at that txn
+          //so we need to start from next one, if any.  Also if batch was created but
+          //fetchTransactionBatch() was never called, we want to start with first txn
+          int minOpenTxnIndex = Math.max(currentTxnIndex +
+            (state == TxnState.ABORTED || state == TxnState.COMMITTED ? 1 : 0), 0);
+          for(currentTxnIndex = minOpenTxnIndex;
+              currentTxnIndex < txnToWriteIds.size(); currentTxnIndex++) {
+            msClient.rollbackTxn(txnToWriteIds.get(currentTxnIndex).getTxnId());
+            txnStatus[currentTxnIndex] = TxnState.ABORTED;
+          }
+          currentTxnIndex--;//since the loop left it == txnToWriteIds.size()
+        }
+        else {
+          if (getCurrentTxnId() > 0) {
+            msClient.rollbackTxn(getCurrentTxnId());
+            txnStatus[currentTxnIndex] = TxnState.ABORTED;
+          }
+        }
+        state = TxnState.ABORTED;
+        recordWriter.clear();
+      } catch (NoSuchTxnException e) {
+        throw new TransactionError("Unable to abort invalid transaction id : "
+                + getCurrentTxnId(), e);
+      } catch (TException e) {
+        throw new TransactionError("Unable to abort transaction id : "
+                + getCurrentTxnId(), e);
+      }
+    }
+
+    @Override
+    public void heartbeat() throws StreamingException, HeartBeatFailure {
+      if(isClosed) {
+        return;
+      }
+      if(state != TxnState.OPEN && currentTxnIndex >= txnToWriteIds.size() - 1) {
+        //here means last txn in the batch is resolved but the close() hasn't been called yet so
+        //there is nothing to heartbeat
+        return;
+      }
+      //if here after commit()/abort() but before next beginNextTransaction(), currentTxnIndex still
+      //points at the last txn which we don't want to heartbeat
+      Long first = txnToWriteIds.get(state == TxnState.OPEN ? currentTxnIndex : currentTxnIndex + 1).getTxnId();
+      Long last = txnToWriteIds.get(txnToWriteIds.size()-1).getTxnId();
+      try {
+        HeartbeatTxnRangeResponse resp = heartbeaterMSClient.heartbeatTxnRange(first, last);
+        if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) {
+          throw new HeartBeatFailure(resp.getAborted(), resp.getNosuch());
+        }
+      } catch (TException e) {
+        throw new StreamingException("Failure to heartbeat on ids (" + first + "src/gen/thrift"
+                + last + ") on end point : " + endPt );
+      }
+    }
+
+    @Override
+    public boolean isClosed() {
+      return isClosed;
+    }
+    /**
+     * Close the TransactionBatch.  This will abort any still open txns in this batch.
+     * @throws StreamingIOFailure I/O failure when closing transaction batch
+     */
+    @Override
+    public void close() throws StreamingException, ImpersonationFailed, InterruptedException {
+      if(isClosed) {
+        return;
+      }
+      isClosed = true;
+      abortImpl(true);//abort proactively so that we don't wait for timeout
+      closeImpl();//perhaps we should add a version of RecordWriter.closeBatch(boolean abort) which
+      //will call RecordUpdater.close(boolean abort)
+    }
+    private void closeImpl() throws StreamingException, InterruptedException{
+      state = TxnState.INACTIVE;
+      if(ugi == null) {
+        recordWriter.closeBatch();
+        return;
+      }
+      try {
+        ugi.doAs (
+                new PrivilegedExceptionAction<Void>() {
+                  @Override
+                  public Void run() throws StreamingException {
+                    recordWriter.closeBatch();
+                    return null;
+                  }
+                }
+        );
+        try {
+          FileSystem.closeAllForUGI(ugi);
+        } catch (IOException exception) {
+          LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception);
+        }
+      } catch (IOException e) {
+        throw new ImpersonationFailed("Failed closing Txn Batch as user '" + username +
+                "' on  endPoint :" + endPt, e);
+      }
+    }
+
+    private static LockRequest createLockRequest(final HiveEndPoint hiveEndPoint,
+            String partNameForLock, String user, long txnId, String agentInfo)  {
+      LockRequestBuilder rqstBuilder = agentInfo == null ?
+        new LockRequestBuilder() : new LockRequestBuilder(agentInfo);
+      rqstBuilder.setUser(user);
+      rqstBuilder.setTransactionId(txnId);
+
+      LockComponentBuilder lockCompBuilder = new LockComponentBuilder()
+              .setDbName(hiveEndPoint.database)
+              .setTableName(hiveEndPoint.table)
+              .setShared()
+              .setOperationType(DataOperationType.INSERT);
+      if (partNameForLock!=null && !partNameForLock.isEmpty() ) {
+          lockCompBuilder.setPartitionName(partNameForLock);
+      }
+      rqstBuilder.addLockComponent(lockCompBuilder.build());
+
+      return rqstBuilder.build();
+    }
+  } // class TransactionBatchImpl
+
+  static HiveConf createHiveConf(Class<?> clazz, String metaStoreUri) {
+    HiveConf conf = new HiveConf(clazz);
+    if (metaStoreUri!= null) {
+      setHiveConf(conf, HiveConf.ConfVars.METASTOREURIS, metaStoreUri);
+    }
+    HiveEndPoint.overrideConfSettings(conf);
+    return conf;
+  }
+
+  private static void overrideConfSettings(HiveConf conf) {
+    setHiveConf(conf, HiveConf.ConfVars.HIVE_TXN_MANAGER,
+            "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+    setHiveConf(conf, HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+    setHiveConf(conf, HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
+    // Avoids creating Tez Client sessions internally as it takes much longer currently
+    setHiveConf(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "mr");
+  }
+
+  private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, String value) {
+    if( LOG.isDebugEnabled() ) {
+      LOG.debug("Overriding HiveConf setting : " + var + " = " + value);
+    }
+    conf.setVar(var, value);
+  }
+
+  private static void setHiveConf(HiveConf conf, HiveConf.ConfVars var, boolean value) {
+    if( LOG.isDebugEnabled() ) {
+      LOG.debug("Overriding HiveConf setting : " + var + " = " + value);
+    }
+    conf.setBoolVar(var, value);
+  }
+
+}  // class HiveEndPoint
diff --git a/streaming/src/java/org/apache/hive/streaming/ImpersonationFailed.java b/streaming/src/java/org/apache/hive/streaming/ImpersonationFailed.java
new file mode 100644 (file)
index 0000000..23e17e7
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class ImpersonationFailed extends StreamingException {
+  public ImpersonationFailed(String username, Exception e) {
+    super("Failed to impersonate user " + username, e);
+  }
+}
diff --git a/streaming/src/java/org/apache/hive/streaming/InvalidColumn.java b/streaming/src/java/org/apache/hive/streaming/InvalidColumn.java
new file mode 100644 (file)
index 0000000..0011b14
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class InvalidColumn extends StreamingException {
+
+  public InvalidColumn(String msg) {
+    super(msg);
+  }
+}
diff --git a/streaming/src/java/org/apache/hive/streaming/InvalidPartition.java b/streaming/src/java/org/apache/hive/streaming/InvalidPartition.java
new file mode 100644 (file)
index 0000000..f1f9804
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class InvalidPartition extends StreamingException {
+
+  public InvalidPartition(String partitionName, String partitionValue) {
+    super("Invalid partition: Name=" + partitionName +
+            ", Value=" + partitionValue);
+  }
+
+}
diff --git a/streaming/src/java/org/apache/hive/streaming/InvalidTable.java b/streaming/src/java/org/apache/hive/streaming/InvalidTable.java
new file mode 100644 (file)
index 0000000..ef1c91d
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class InvalidTable extends StreamingException {
+
+  private static String makeMsg(String db, String table) {
+    return "Invalid table db:" + db + ", table:" + table;
+  }
+
+  public InvalidTable(String db, String table) {
+    super(makeMsg(db,table), null);
+  }
+
+  public InvalidTable(String db, String table, String msg) {
+    super(makeMsg(db, table) + ": " + msg, null);
+  }
+
+  public InvalidTable(String db, String table, Exception inner) {
+    super(makeMsg(db, table) + ": " + inner.getMessage(), inner);
+  }
+}
diff --git a/streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java b/streaming/src/java/org/apache/hive/streaming/InvalidTrasactionState.java
new file mode 100644 (file)
index 0000000..762f5f8
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class InvalidTrasactionState extends TransactionError {
+  public InvalidTrasactionState(String msg) {
+    super(msg);
+  }
+
+}
diff --git a/streaming/src/java/org/apache/hive/streaming/PartitionCreationFailed.java b/streaming/src/java/org/apache/hive/streaming/PartitionCreationFailed.java
new file mode 100644 (file)
index 0000000..5f9aca6
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class PartitionCreationFailed extends StreamingException {
+  public PartitionCreationFailed(HiveEndPoint endPoint, Exception cause) {
+    super("Failed to create partition " + endPoint, cause);
+  }
+}
diff --git a/streaming/src/java/org/apache/hive/streaming/QueryFailedException.java b/streaming/src/java/org/apache/hive/streaming/QueryFailedException.java
new file mode 100644 (file)
index 0000000..ccd3ae0
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class QueryFailedException extends StreamingException {
+  String query;
+
+  public QueryFailedException(String query, Exception e) {
+    super("Query failed: " + query + ". Due to :" + e.getMessage(), e);
+    this.query = query;
+  }
+}
diff --git a/streaming/src/java/org/apache/hive/streaming/RecordWriter.java b/streaming/src/java/org/apache/hive/streaming/RecordWriter.java
new file mode 100644 (file)
index 0000000..dc6d70e
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+public interface RecordWriter {
+
+  /** Writes using a hive RecordUpdater
+   *
+   * @param writeId the write ID of the table mapping to Txn in which the write occurs
+   * @param record the record to be written
+   */
+  void write(long writeId, byte[] record) throws StreamingException;
+
+  /** Flush records from buffer. Invoked by TransactionBatch.commit() */
+  void flush() throws StreamingException;
+
+  /** Clear bufferred writes. Invoked by TransactionBatch.abort() */
+  void clear() throws StreamingException;
+
+  /** Acquire a new RecordUpdater. Invoked when
+   * StreamingConnection.fetchTransactionBatch() is called */
+  void newBatch(Long minWriteId, Long maxWriteID) throws StreamingException;
+
+  /** Close the RecordUpdater. Invoked by TransactionBatch.close() */
+  void closeBatch() throws StreamingException;
+}
diff --git a/streaming/src/java/org/apache/hive/streaming/SerializationError.java b/streaming/src/java/org/apache/hive/streaming/SerializationError.java
new file mode 100644 (file)
index 0000000..a57ba00
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+public class SerializationError extends StreamingException {
+  public SerializationError(String msg, Exception e) {
+    super(msg,e);
+  }
+}
diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/StreamingConnection.java
new file mode 100644 (file)
index 0000000..2f760ea
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Represents a connection to a HiveEndPoint. Used to acquire transaction batches.
+ * Note: the expectation is that there is at most 1 TransactionBatch outstanding for any given
+ * StreamingConnection.  Violating this may result in "out of sequence response".
+ */
+public interface StreamingConnection {
+
+  /**
+   * Acquires a new batch of transactions from Hive.
+
+   * @param numTransactionsHint is a hint from client indicating how many transactions client needs.
+   * @param writer  Used to write record. The same writer instance can
+   *                      be shared with another TransactionBatch (to the same endpoint)
+   *                      only after the first TransactionBatch has been closed.
+   *                      Writer will be closed when the TransactionBatch is closed.
+   * @return
+   * @throws ConnectionError
+   * @throws InvalidPartition
+   * @throws StreamingException
+   * @return a batch of transactions
+   */
+  public TransactionBatch fetchTransactionBatch(int numTransactionsHint,
+                                                RecordWriter writer)
+          throws ConnectionError, StreamingException, InterruptedException;
+
+  /**
+   * Close connection
+   */
+  public void close();
+
+  /**
+   * @return UserGroupInformation associated with this connection or {@code null} if there is none
+   */
+  UserGroupInformation getUserGroupInformation();
+}
diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingException.java b/streaming/src/java/org/apache/hive/streaming/StreamingException.java
new file mode 100644 (file)
index 0000000..a7f84c1
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class StreamingException extends Exception {
+  public StreamingException(String msg, Exception cause) {
+    super(msg, cause);
+  }
+  public StreamingException(String msg) {
+    super(msg);
+  }
+}
diff --git a/streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java b/streaming/src/java/org/apache/hive/streaming/StreamingIOFailure.java
new file mode 100644 (file)
index 0000000..0dfbfa7
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+public class StreamingIOFailure extends StreamingException {
+
+  public StreamingIOFailure(String msg, Exception cause) {
+    super(msg, cause);
+  }
+
+  public StreamingIOFailure(String msg) {
+    super(msg);
+  }
+}
diff --git a/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java
new file mode 100644 (file)
index 0000000..0077913
--- /dev/null
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hive.hcatalog.data.HCatRecordObjectInspector;
+import org.apache.hive.hcatalog.data.JsonSerDe;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Streaming Writer handles utf8 encoded Json (Strict syntax).
+ * Uses org.apache.hive.hcatalog.data.JsonSerDe to process Json input
+ */
+public class StrictJsonWriter extends AbstractRecordWriter {
+  private JsonSerDe serde;
+
+  private final HCatRecordObjectInspector recordObjInspector;
+  private final ObjectInspector[] bucketObjInspectors;
+  private final StructField[] bucketStructFields;
+
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link #StrictJsonWriter(HiveEndPoint, HiveConf, StreamingConnection)}
+   */
+  public StrictJsonWriter(HiveEndPoint endPoint)
+    throws ConnectionError, SerializationError, StreamingException {
+    this(endPoint, null, null);
+  }
+
+  /**
+   * @deprecated As of release 1.3/2.1.  Replaced by {@link #StrictJsonWriter(HiveEndPoint, HiveConf, StreamingConnection)}
+   */
+  public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf) throws StreamingException {
+    this(endPoint, conf, null);
+  }
+  /**
+   * @param endPoint the end point to write to
+   * @throws ConnectionError
+   * @throws SerializationError
+   * @throws StreamingException
+   */
+  public StrictJsonWriter(HiveEndPoint endPoint, StreamingConnection conn)
+          throws ConnectionError, SerializationError, StreamingException {
+    this(endPoint, null, conn);
+  }
+  /**
+   * @param endPoint the end point to write to
+   * @param conf a Hive conf object. Should be null if not using advanced Hive settings.
+   * @param conn connection this Writer is to be used with
+   * @throws ConnectionError
+   * @throws SerializationError
+   * @throws StreamingException
+   */
+  public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
+          throws ConnectionError, SerializationError, StreamingException {
+    super(endPoint, conf, conn);
+    this.serde = createSerde(tbl, conf);
+    // get ObjInspectors for entire record and bucketed cols
+    try {
+      recordObjInspector = ( HCatRecordObjectInspector ) serde.getObjectInspector();
+      this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector);
+    } catch (SerDeException e) {
+      throw new SerializationError("Unable to get ObjectInspector for bucket columns", e);
+    }
+
+    // get StructFields for bucketed cols
+    bucketStructFields = new StructField[bucketIds.size()];
+    List<? extends StructField> allFields = recordObjInspector.getAllStructFieldRefs();
+    for (int i = 0; i < bucketIds.size(); i++) {
+      bucketStructFields[i] = allFields.get(bucketIds.get(i));
+    }
+  }
+
+  @Override
+  public AbstractSerDe getSerde() {
+    return serde;
+  }
+
+  protected HCatRecordObjectInspector getRecordObjectInspector() {
+    return recordObjInspector;
+  }
+
+  @Override
+  protected StructField[] getBucketStructFields() {
+    return bucketStructFields;
+  }
+
+  protected ObjectInspector[] getBucketObjectInspectors() {
+    return bucketObjInspectors;
+  }
+
+
+  @Override
+  public void write(long writeId, byte[] record)
+          throws StreamingIOFailure, SerializationError {
+    try {
+      Object encodedRow = encode(record);
+      int bucket = getBucket(encodedRow);
+      getRecordUpdater(bucket).insert(writeId, encodedRow);
+    } catch (IOException e) {
+      throw new StreamingIOFailure("Error writing record in transaction write id("
+              + writeId + ")", e);
+    }
+
+  }
+
+  /**
+   * Creates JsonSerDe
+   * @param tbl   used to create serde
+   * @param conf  used to create serde
+   * @return
+   * @throws SerializationError if serde could not be initialized
+   */
+  private static JsonSerDe createSerde(Table tbl, HiveConf conf)
+          throws SerializationError {
+    try {
+      Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
+      JsonSerDe serde = new JsonSerDe();
+      SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
+      return serde;
+    } catch (SerDeException e) {
+      throw new SerializationError("Error initializing serde " + JsonSerDe.class.getName(), e);
+    }
+  }
+
+  @Override
+  public Object encode(byte[] utf8StrRecord) throws SerializationError {
+    try {
+      Text blob = new Text(utf8StrRecord);
+      return serde.deserialize(blob);
+    } catch (SerDeException e) {
+      throw new SerializationError("Unable to convert byte[] record into Object", e);
+    }
+  }
+
+}
diff --git a/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java b/streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java
new file mode 100644 (file)
index 0000000..c0b7324
--- /dev/null
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.RegexSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Streaming Writer handles text input data with regex. Uses
+ * org.apache.hadoop.hive.serde2.RegexSerDe
+ */
+public class StrictRegexWriter extends AbstractRecordWriter {
+  private RegexSerDe serde;
+  private final StructObjectInspector recordObjInspector;
+  private final ObjectInspector[] bucketObjInspectors;
+  private final StructField[] bucketStructFields;
+
+  /**
+   * @param endPoint the end point to write to
+   * @param conn     connection this Writer is to be used with
+   * @throws ConnectionError
+   * @throws SerializationError
+   * @throws StreamingException
+   */
+  public StrictRegexWriter(HiveEndPoint endPoint, StreamingConnection conn)
+    throws ConnectionError, SerializationError, StreamingException {
+    this(null, endPoint, null, conn);
+  }
+
+  /**
+   * @param endPoint the end point to write to
+   * @param conf     a Hive conf object. Should be null if not using advanced Hive settings.
+   * @param conn     connection this Writer is to be used with
+   * @throws ConnectionError
+   * @throws SerializationError
+   * @throws StreamingException
+   */
+  public StrictRegexWriter(HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
+    throws ConnectionError, SerializationError, StreamingException {
+    this(null, endPoint, conf, conn);
+  }
+
+  /**
+   * @param regex    to parse the data
+   * @param endPoint the end point to write to
+   * @param conf     a Hive conf object. Should be null if not using advanced Hive settings.
+   * @param conn     connection this Writer is to be used with
+   * @throws ConnectionError
+   * @throws SerializationError
+   * @throws StreamingException
+   */
+  public StrictRegexWriter(String regex, HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
+    throws ConnectionError, SerializationError, StreamingException {
+    super(endPoint, conf, conn);
+    this.serde = createSerde(tbl, conf, regex);
+    // get ObjInspectors for entire record and bucketed cols
+    try {
+      recordObjInspector = (StructObjectInspector) serde.getObjectInspector();
+      this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector);
+    } catch (SerDeException e) {
+      throw new SerializationError("Unable to get ObjectInspector for bucket columns", e);
+    }
+
+    // get StructFields for bucketed cols
+    bucketStructFields = new StructField[bucketIds.size()];
+    List<? extends StructField> allFields = recordObjInspector.getAllStructFieldRefs();
+    for (int i = 0; i < bucketIds.size(); i++) {
+      bucketStructFields[i] = allFields.get(bucketIds.get(i));
+    }
+  }
+
+  @Override
+  public AbstractSerDe getSerde() {
+    return serde;
+  }
+
+  @Override
+  protected StructObjectInspector getRecordObjectInspector() {
+    return recordObjInspector;
+  }
+
+  @Override
+  protected StructField[] getBucketStructFields() {
+    return bucketStructFields;
+  }
+
+  @Override
+  protected ObjectInspector[] getBucketObjectInspectors() {
+    return bucketObjInspectors;
+  }
+
+
+  @Override
+  public void write(long writeId, byte[] record)
+    throws StreamingIOFailure, SerializationError {
+    try {
+      Object encodedRow = encode(record);
+      int bucket = getBucket(encodedRow);
+      getRecordUpdater(bucket).insert(writeId, encodedRow);
+    } catch (IOException e) {
+      throw new StreamingIOFailure("Error writing record in transaction write id("
+        + writeId + ")", e);
+    }
+  }
+
+  /**
+   * Creates RegexSerDe
+   *
+   * @param tbl   used to create serde
+   * @param conf  used to create serde
+   * @param regex used to create serde
+   * @return
+   * @throws SerializationError if serde could not be initialized
+   */
+  private static RegexSerDe createSerde(Table tbl, HiveConf conf, String regex)
+    throws SerializationError {
+    try {
+      Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
+      tableProps.setProperty(RegexSerDe.INPUT_REGEX, regex);
+      ArrayList<String> tableColumns = getCols(tbl);
+      tableProps.setProperty(serdeConstants.LIST_COLUMNS, StringUtils.join(tableColumns, ","));
+      RegexSerDe serde = new RegexSerDe();
+      SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
+      return serde;
+    } catch (SerDeException e) {
+      throw new SerializationError("Error initializing serde " + RegexSerDe.class.getName(), e);
+    }
+  }
+
+  private static ArrayList<String> getCols(Table table) {
+    List<FieldSchema> cols = table.getSd().getCols();
+    ArrayList<String> colNames = new ArrayList<String>(cols.size());
+    for (FieldSchema col : cols) {
+      colNames.add(col.getName().toLowerCase());
+    }
+    return colNames;
+  }
+
+  /**
+   * Encode Utf8 encoded string bytes using RegexSerDe
+   *
+   * @param utf8StrRecord
+   * @return The encoded object
+   * @throws SerializationError
+   */
+  @Override
+  public Object encode(byte[] utf8StrRecord) throws SerializationError {
+    try {
+      Text blob = new Text(utf8StrRecord);
+      return serde.deserialize(blob);
+    } catch (SerDeException e) {
+      throw new SerializationError("Unable to convert byte[] record into Object", e);
+    }
+  }
+
+}
diff --git a/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java b/streaming/src/java/org/apache/hive/streaming/TransactionBatch.java
new file mode 100644 (file)
index 0000000..2b05771
--- /dev/null
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+
+import java.util.Collection;
+
+/**
+ * Represents a set of Transactions returned by Hive. Supports opening, writing to
+ * and commiting/aborting each transaction. The interface is designed to ensure
+ * transactions in a batch are used up sequentially. To stream to the same HiveEndPoint
+ * concurrently, create separate StreamingConnections.
+ *
+ * Note on thread safety: At most 2 threads can run through a given TransactionBatch at the same
+ * time.  One thread may call {@link #heartbeat()} and the other all other methods.
+ * Violating this may result in "out of sequence response".
+ *
+ */
+public interface TransactionBatch  {
+  enum TxnState {
+    INACTIVE("I"), OPEN("O"), COMMITTED("C"), ABORTED("A");
+
+    private final String code;
+    TxnState(String code) {
+      this.code = code;
+    };
+    public String toString() {
+      return code;
+    }
+  }
+
+  /**
+   * Activate the next available transaction in the current transaction batch.
+   * @throws StreamingException if not able to switch to next Txn
+   * @throws InterruptedException if call in interrupted
+   */
+  void beginNextTransaction() throws StreamingException, InterruptedException;
+
+  /**
+   * Get Id of currently open transaction.
+   * @return transaction id
+   */
+  Long getCurrentTxnId();
+
+
+  /**
+   * Get write Id mapping to currently open transaction.
+   * @return write id
+   */
+  Long getCurrentWriteId();
+
+  /**
+   * get state of current transaction.
+   */
+  TxnState getCurrentTransactionState();
+
+  /**
+   * Commit the currently open transaction.
+   * @throws StreamingException if there are errors committing
+   * @throws InterruptedException if call in interrupted
+   */
+  void commit() throws StreamingException, InterruptedException;
+
+  /**
+   * Abort the currently open transaction.
+   * @throws StreamingException if there are errors
+   * @throws InterruptedException if call in interrupted
+   */
+  void abort() throws StreamingException, InterruptedException;
+
+  /**
+   * Remaining transactions are the ones that are not committed or aborted or open.
+   * Current open transaction is not considered part of remaining txns.
+   * @return number of transactions remaining this batch.
+   */
+  int remainingTransactions();
+
+
+  /**
+   *  Write record using RecordWriter.
+   * @param record  the data to be written
+   * @throws StreamingException if there are errors when writing
+   * @throws InterruptedException if call in interrupted
+   */
+  void write(byte[] record) throws StreamingException, InterruptedException;
+
+  /**
+   *  Write records using RecordWriter.
+   * @throws StreamingException if there are errors when writing
+   * @throws InterruptedException if call in interrupted
+   */
+  void write(Collection<byte[]> records) throws StreamingException, InterruptedException;
+
+
+  /**
+   * Issues a heartbeat to hive metastore on the current and remaining txn ids
+   * to keep them from expiring.
+   * @throws StreamingException if there are errors
+   */
+  void heartbeat() throws StreamingException;
+
+  /**
+   * Close the TransactionBatch.
+   * @throws StreamingException if there are errors closing batch
+   * @throws InterruptedException if call in interrupted
+   */
+  void close() throws StreamingException, InterruptedException;
+  boolean isClosed();
+}
diff --git a/streaming/src/java/org/apache/hive/streaming/TransactionBatchUnAvailable.java b/streaming/src/java/org/apache/hive/streaming/TransactionBatchUnAvailable.java
new file mode 100644 (file)
index 0000000..a8c8cd4
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class TransactionBatchUnAvailable extends StreamingException {
+  public TransactionBatchUnAvailable(HiveEndPoint ep, Exception e) {
+    super("Unable to acquire transaction batch on end point: " + ep, e);
+  }
+}
diff --git a/streaming/src/java/org/apache/hive/streaming/TransactionError.java b/streaming/src/java/org/apache/hive/streaming/TransactionError.java
new file mode 100644 (file)
index 0000000..a331b20
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+public class TransactionError extends StreamingException {
+  public TransactionError(String msg, Exception e) {
+    super(msg + (e == null ? "" : ": " + e.getMessage()), e);
+  }
+
+  public TransactionError(String msg) {
+    super(msg);
+  }
+}
diff --git a/streaming/src/java/org/apache/hive/streaming/package.html b/streaming/src/java/org/apache/hive/streaming/package.html
new file mode 100644 (file)
index 0000000..2b45792
--- /dev/null
@@ -0,0 +1,181 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
+        "http://www.w3.org/TR/html4/loose.dtd">
+
+<html lang="en">
+
+<head>
+<meta name=Title content="HCatalog Streaming API">
+<meta name=Keywords content="HCatalog Streaming ACID">
+<meta http-equiv=Content-Type content="text/html; charset=macintosh">
+<title>HCatalog Streaming API</title>
+</head>
+
+<body>
+
+<h1>HCatalog Streaming API -- high level description</h1>
+
+<b>NOTE: The Streaming API feature is provided as a technology
+preview. The API may undergo incompatible changes in upcoming
+releases.</b>
+
+<p>
+Traditionally adding new data into hive requires gathering a large
+amount of data onto HDFS and then periodically adding a new
+partition. This is essentially a <i>batch insertion</i>. Insertion of
+new data into an existing partition or table is not done in a way that
+gives consistent results to readers. Hive Streaming API allows data to
+be pumped continuously into Hive. The incoming data can be
+continuously committed in small batches (of records) into a Hive
+partition. Once data is committed it becomes immediately visible to
+all Hive queries initiated subsequently.</p>
+
+<p>
+This API is intended for streaming clients such as NiFi, Flume and Storm,
+which continuously generate data. Streaming support is built on top of
+ACID based insert/update support in Hive.</p>
+
+<p>
+The classes and interfaces part of the Hive streaming API are broadly
+categorized into two. The first set provides support for connection
+and transaction management while the second set provides I/O
+support. Transactions are managed by the Hive MetaStore.  Writes are
+performed to HDFS via Hive wrapper APIs that bypass MetaStore. </p>
+
+<p>
+<b>Note on packaging</b>: The APIs are defined in the 
+<b>org.apache.hive.streaming</b> Java package and included as
+the hive-streaming jar.</p>
+
+<h2>STREAMING REQUIREMENTS</h2>
+
+<p>
+A few things are currently required to use streaming.
+</p>
+
+<p>
+<ol>
+  <li> Currently, only ORC storage format is supported. So 
+    '<b>stored as orc</b>' must be specified during table creation.</li>
+  <li> The hive table may be bucketed but must not be sorted. </li>
+  <li> User of the client streaming process must have the necessary 
+    permissions to write to the table or partition and create partitions in
+    the table.</li>
+  <li> Currently, when issuing queries on streaming tables, query client must set
+    <ol>
+      <li><b>hive.input.format =
+             org.apache.hadoop.hive.ql.io.HiveInputFormat</b></li>
+    </ol></li>
+  The above client settings are a temporary requirement and the intention is to
+  drop the need for them in the near future.
+  <li> Settings required in hive-site.xml for Metastore:
+    <ol>
+      <li><b>hive.txn.manager =
+       org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</b></li>
+      <li><b>hive.support.concurrency = true </b> </li>
+      <li><b>hive.compactor.initiator.on = true</b> </li>
+      <li><b>hive.compactor.worker.threads > 0 </b> </li>
+    </ol></li>
+</ol></p>
+
+<p>
+<b>Note:</b> Streaming to <b>unpartitioned</b> tables is also
+supported.</p>
+
+<h2>Transaction and Connection management</h2>
+
+<p>
+The class <a href="HiveEndPoint.html"><b>HiveEndPoint</b></a> is a Hive end
+point to connect to. An endpoint is either a Hive table or
+partition. An endpoint is cheap to create and does not internally hold
+on to any network connections.  Invoking the newConnection method on
+it creates a new connection to the Hive MetaStore for streaming
+purposes.  It returns a
+<a href="StreamingConnection.html"><b>StreamingConnection</b></a>
+object. Multiple connections can be established on the same
+endpoint. StreamingConnection can then be used to initiate new
+transactions for performing I/O. </p>
+
+<h3>Dynamic Partition Creation:</h3> It is very likely that a setup in
+which data is being streamed continuously (e.g. Flume), it is
+desirable to have new partitions created automatically (say on a
+hourly basis). In such cases requiring the Hive admin to pre-create
+the necessary partitions may not be reasonable.  Consequently the
+streaming API allows streaming clients to create partitions as
+needed. <b>HiveEndPoind.newConnection()</b> accepts a argument to
+indicate if the partition should be auto created. Partition creation
+being an atomic action, multiple clients can race to create the
+partition, but only one would succeed, so streaming clients need not
+synchronize when creating a partition. The user of the client process
+needs to be given write permissions on the Hive table in order to
+create partitions.
+
+<h3>Batching Transactions:</h3> Transactions are implemented slightly
+differently than traditional database systems. Multiple transactions
+are grouped into a <i>Transaction Batch</i> and each transaction has
+an id. Data from each transaction batch gets a single file on HDFS,
+which eventually gets compacted with other files into a larger file
+automatically for efficiency.
+
+<h3>Basic Steps:</h3> After connection is established, a streaming
+client first requests for a new batch of transactions. In response it
+receives a set of transaction ids that are part of the transaction
+batch. Subsequently the client proceeds to consume one transaction at
+a time by initiating new transactions. Client will write() one or more
+records per transactions and either commit or abort the current
+transaction before switching to the next one. Each
+<b>TransactionBatch.write()</b> invocation automatically associates
+the I/O attempt with the current transaction id. The user of the
+streaming client needs to have write permissions to the partition or
+table.</p>
+
+<p>
+<b>Concurrency Note:</b> I/O can be performed on multiple
+<b>TransactionBatch</b>s concurrently. However the transactions within a
+transaction batch much be consumed sequentially.</p>
+
+<h2>Writing Data</h2>
+
+<p>
+These classes and interfaces provide support for writing the data to
+Hive within a transaction. 
+<a href="RecordWriter.html"><b>RecordWriter</b></a> is the interface
+implemented by all writers. A writer is responsible for taking a
+record in the form of a <b>byte[]</b> containing data in a known
+format (e.g. CSV) and writing it out in the format supported by Hive
+streaming. A <b>RecordWriter</b> may reorder or drop fields from the incoming
+record if necessary to map them to the corresponding columns in the
+Hive Table. A streaming client will instantiate an appropriate
+<b>RecordWriter</b> type and pass it to 
+<b>StreamingConnection.fetchTransactionBatch()</b>.  The streaming client 
+does not directly interact with the <b>RecordWriter</b> therafter, but 
+relies on the <b>TransactionBatch</b> to do so.</p>
+
+<p>
+Currently, out of the box, the streaming API provides two
+implementations of the <b>RecordWriter</b> interface. One handles delimited
+input data (such as CSV, tab separated, etc.  and the other for JSON
+(strict syntax). Support for other input formats can be provided by
+additional implementations of the <b>RecordWriter</b> interface.
+<ul>
+<li> <a href="DelimitedInputWriter.html"><b>DelimitedInputWriter</b></a> 
+- Delimited text input.</li>
+<li> <a href="StrictJsonWriter.html"><b>StrictJsonWriter</b></a> 
+- JSON text input.</li>
+  <li> <a href="StrictRegexWriter.html"><b>StrictRegexWriter</b></a>
+    - text input with regex.</li>
+</ul></p>
+
+<h2>Performance, Concurrency, Etc.</h2>
+<p>
+  Each StreamingConnection is writing data at the rate the underlying
+  FileSystem can accept it.  If that is not sufficient, multiple StreamingConnection objects can
+  be created concurrently.
+</p>
+<p>
+  Each StreamingConnection can have at most 1 outstanding TransactionBatch and each TransactionBatch
+  may have at most 2 threads operaing on it.
+  See <a href="TransactionBatch.html"><b>TransactionBatch</b></a>
+</p>
+</body>
+
+</html>
diff --git a/streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java b/streaming/src/test/org/apache/hive/streaming/TestDelimitedInputWriter.java
new file mode 100644 (file)
index 0000000..f0843a1
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+import junit.framework.Assert;
+
+public class TestDelimitedInputWriter {
+  @Test
+  public void testFieldReordering() throws Exception {
+
+    ArrayList<String> colNames = Lists.newArrayList(new String[]{"col1", "col2", "col3", "col4", "col5"});
+    {//1)  test dropping fields - first middle  & last
+      String[] fieldNames = {null, "col2", null, "col4", null};
+      int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+      Assert.assertTrue(Arrays.equals(mapping, new int[]{-1, 1, -1, 3, -1}));
+    }
+
+    {//2)  test reordering
+      String[] fieldNames = {"col5", "col4", "col3", "col2", "col1"};
+      int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+      Assert.assertTrue( Arrays.equals(mapping, new int[]{4,3,2,1,0}) );
+    }
+
+    {//3)  test bad field names
+      String[] fieldNames = {"xyz", "abc", "col3", "col4", "as"};
+      try {
+        DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+        Assert.fail();
+      } catch (InvalidColumn e) {
+        // should throw
+      }
+    }
+
+    {//4)  test few field names
+      String[] fieldNames = {"col3", "col4"};
+      int[] mapping = DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+      Assert.assertTrue( Arrays.equals(mapping, new int[]{2,3}) );
+    }
+
+    {//5)  test extra field names
+      String[] fieldNames = {"col5", "col4", "col3", "col2", "col1", "col1"};
+      try {
+      DelimitedInputWriter.getFieldReordering(fieldNames, colNames);
+      Assert.fail();
+      } catch (InvalidColumn e) {
+        //show throw
+      }
+    }
+  }
+}
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
new file mode 100644 (file)
index 0000000..6f63bfb
--- /dev/null
@@ -0,0 +1,2330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.streaming;
+
+import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.BUCKET_COUNT;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.Validator;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnInfo;
+import org.apache.hadoop.hive.metastore.api.TxnState;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.DriverFactory;
+import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
+import org.apache.hadoop.hive.ql.io.orc.RecordReader;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.txn.compactor.Worker;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.orc.impl.OrcAcidUtils;
+import org.apache.orc.tools.FileDump;
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TestStreaming {
+  private static final Logger LOG = LoggerFactory.getLogger(TestStreaming.class);
+
+  public static class RawFileSystem extends RawLocalFileSystem {
+    private static final URI NAME;
+    static {
+      try {
+        NAME = new URI("raw:///");
+      } catch (URISyntaxException se) {
+        throw new IllegalArgumentException("bad uri", se);
+      }
+    }
+
+    @Override
+    public URI getUri() {
+      return NAME;
+    }
+
+
+    @Override
+    public FileStatus getFileStatus(Path path) throws IOException {
+      File file = pathToFile(path);
+      if (!file.exists()) {
+        throw new FileNotFoundException("Can't find " + path);
+      }
+      // get close enough
+      short mod = 0;
+      if (file.canRead()) {
+        mod |= 0444;
+      }
+      if (file.canWrite()) {
+        mod |= 0200;
+      }
+      if (file.canExecute()) {
+        mod |= 0111;
+      }
+      return new FileStatus(file.length(), file.isDirectory(), 1, 1024,
+          file.lastModified(), file.lastModified(),
+          FsPermission.createImmutable(mod), "owen", "users", path);
+    }
+  }
+
+  private static final String COL1 = "id";
+  private static final String COL2 = "msg";
+
+  private final HiveConf conf;
+  private IDriver driver;
+  private final IMetaStoreClient msClient;
+
+  final String metaStoreURI = null;
+
+  // partitioned table
+  private final static String dbName = "testing";
+  private final static String tblName = "alerts";
+  private final static String[] fieldNames = new String[]{COL1,COL2};
+  List<String> partitionVals;
+  private static Path partLoc;
+  private static Path partLoc2;
+
+  // unpartitioned table
+  private final static String dbName2 = "testing2";
+  private final static String tblName2 = "alerts";
+  private final static String[] fieldNames2 = new String[]{COL1,COL2};
+
+
+  // for bucket join testing
+  private final static String dbName3 = "testing3";
+  private final static String tblName3 = "dimensionTable";
+  private final static String dbName4 = "testing4";
+  private final static String tblName4 = "factTable";
+  List<String> partitionVals2;
+
+
+  private final String PART1_CONTINENT = "Asia";
+  private final String PART1_COUNTRY = "India";
+
+  @Rule
+  public TemporaryFolder dbFolder = new TemporaryFolder();
+
+
+  public TestStreaming() throws Exception {
+    partitionVals = new ArrayList<String>(2);
+    partitionVals.add(PART1_CONTINENT);
+    partitionVals.add(PART1_COUNTRY);
+
+    partitionVals2 = new ArrayList<String>(1);
+    partitionVals2.add(PART1_COUNTRY);
+
+
+    conf = new HiveConf(this.getClass());
+    conf.set("fs.raw.impl", RawFileSystem.class.getName());
+    conf
+    .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+        "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+    TxnDbUtil.setConfValues(conf);
+    if (metaStoreURI!=null) {
+      conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI);
+    }
+    conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+    dbFolder.create();
+
+
+    //1) Start from a clean slate (metastore)
+    TxnDbUtil.cleanDb(conf);
+    TxnDbUtil.prepDb(conf);
+
+    //2) obtain metastore clients
+    msClient = new HiveMetaStoreClient(conf);
+  }
+
+  @Before
+  public void setup() throws Exception {
+    SessionState.start(new CliSessionState(conf));
+    driver = DriverFactory.newDriver(conf);
+    driver.setMaxRows(200002);//make sure Driver returns all results
+    // drop and recreate the necessary databases and tables
+    dropDB(msClient, dbName);
+
+    String[] colNames = new String[] {COL1, COL2};
+    String[] colTypes = new String[] {serdeConstants.INT_TYPE_NAME, serdeConstants.STRING_TYPE_NAME};
+    String[] bucketCols = new String[] {COL1};
+    String loc1 = dbFolder.newFolder(dbName + ".db").toString();
+    String[] partNames = new String[]{"Continent", "Country"};
+    partLoc = createDbAndTable(driver, dbName, tblName, partitionVals, colNames, colTypes, bucketCols, partNames, loc1, 1);
+
+    dropDB(msClient, dbName2);
+    String loc2 = dbFolder.newFolder(dbName2 + ".db").toString();
+    partLoc2 = createDbAndTable(driver, dbName2, tblName2, null, colNames, colTypes, bucketCols, null, loc2, 2);
+
+    String loc3 = dbFolder.newFolder("testing5.db").toString();
+    createStoreSales("testing5", loc3);
+
+    runDDL(driver, "drop table testBucketing3.streamedtable");
+    runDDL(driver, "drop table testBucketing3.finaltable");
+    runDDL(driver, "drop table testBucketing3.nobucket");
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    msClient.close();
+    driver.close();
+  }
+
+  private static List<FieldSchema> getPartitionKeys() {
+    List<FieldSchema> fields = new ArrayList<FieldSchema>();
+    // Defining partition names in unsorted order
+    fields.add(new FieldSchema("continent", serdeConstants.STRING_TYPE_NAME, ""));
+    fields.add(new FieldSchema("country", serdeConstants.STRING_TYPE_NAME, ""));
+    return fields;
+  }
+
+  private void createStoreSales(String dbName, String loc) throws Exception {
+    String dbUri = "raw://" + new Path(loc).toUri().toString();
+    String tableLoc = dbUri + Path.SEPARATOR + "store_sales";
+
+    boolean success = runDDL(driver, "create database IF NOT EXISTS " + dbName + " location '" + dbUri + "'");
+    Assert.assertTrue(success);
+    success = runDDL(driver, "use " + dbName);
+    Assert.assertTrue(success);
+
+    success = runDDL(driver, "drop table if exists store_sales");
+    Assert.assertTrue(success);
+    success = runDDL(driver, "create table store_sales\n" +
+      "(\n" +
+      "    ss_sold_date_sk           int,\n" +
+      "    ss_sold_time_sk           int,\n" +
+      "    ss_item_sk                int,\n" +
+      "    ss_customer_sk            int,\n" +
+      "    ss_cdemo_sk               int,\n" +
+      "    ss_hdemo_sk               int,\n" +
+      "    ss_addr_sk                int,\n" +
+      "    ss_store_sk               int,\n" +
+      "    ss_promo_sk               int,\n" +
+      "    ss_ticket_number          int,\n" +
+      "    ss_quantity               int,\n" +
+      "    ss_wholesale_cost         decimal(7,2),\n" +
+      "    ss_list_price             decimal(7,2),\n" +
+      "    ss_sales_price            decimal(7,2),\n" +
+      "    ss_ext_discount_amt       decimal(7,2),\n" +
+      "    ss_ext_sales_price        decimal(7,2),\n" +
+      "    ss_ext_wholesale_cost     decimal(7,2),\n" +
+      "    ss_ext_list_price         decimal(7,2),\n" +
+      "    ss_ext_tax                decimal(7,2),\n" +
+      "    ss_coupon_amt             decimal(7,2),\n" +
+      "    ss_net_paid               decimal(7,2),\n" +
+      "    ss_net_paid_inc_tax       decimal(7,2),\n" +
+      "    ss_net_profit             decimal(7,2)\n" +
+      ")\n" +
+      " partitioned by (dt string)\n" +
+      "clustered by (ss_store_sk, ss_promo_sk)\n" +
+      "INTO 4 BUCKETS stored as orc " + " location '" + tableLoc +  "'" + "  TBLPROPERTIES ('orc.compress'='NONE', 'transactional'='true')");
+    Assert.assertTrue(success);
+
+    success = runDDL(driver, "alter table store_sales add partition(dt='2015')");
+    Assert.assertTrue(success);
+  }
+  /**
+   * make sure it works with table where bucket col is not 1st col
+   * @throws Exception
+   */
+  @Test
+  public void testBucketingWhereBucketColIsNotFirstCol() throws Exception {
+    List<String> partitionVals = new ArrayList<String>();
+    partitionVals.add("2015");
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testing5", "store_sales", partitionVals);
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+    DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"ss_sold_date_sk","ss_sold_time_sk", "ss_item_sk",
+      "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity",
+      "ss_wholesale_cost", "ss_list_price", "ss_sales_price", "ss_ext_discount_amt", "ss_ext_sales_price", "ss_ext_wholesale_cost",
+      "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", "ss_net_profit"},",", endPt, connection);
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
+    txnBatch.beginNextTransaction();
+
+    StringBuilder row = new StringBuilder();
+    for(int i = 0; i < 10; i++) {
+      for(int ints = 0; ints < 11; ints++) {
+        row.append(ints).append(',');
+      }
+      for(int decs = 0; decs < 12; decs++) {
+        row.append(i + 0.1).append(',');
+      }
+      row.setLength(row.length() - 1);
+      txnBatch.write(row.toString().getBytes());
+    }
+    txnBatch.commit();
+    txnBatch.close();
+    connection.close();
+
+    ArrayList<String> res = queryTable(driver, "select row__id.bucketid, * from testing5.store_sales");
+    for (String re : res) {
+      System.out.println(re);
+    }
+  }
+
+  /**
+   * Test that streaming can write to unbucketed table.
+   */
+  @Test
+  public void testNoBuckets() throws Exception {
+    queryTable(driver, "drop table if exists default.streamingnobuckets");
+    //todo: why does it need transactional_properties?
+    queryTable(driver, "create table default.streamingnobuckets (a string, b string) stored as orc TBLPROPERTIES('transactional'='true', 'transactional_properties'='default')");
+    queryTable(driver, "insert into default.streamingnobuckets values('foo','bar')");
+    List<String> rs = queryTable(driver, "select * from default.streamingnobuckets");
+    Assert.assertEquals(1, rs.size());
+    Assert.assertEquals("foo\tbar", rs.get(0));
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "default", "streamingnobuckets", null);
+    String[] colNames1 = new String[] { "a", "b" };
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+    DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",",  endPt, connection);
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, wr);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("a1,b2".getBytes());
+    txnBatch.write("a3,b4".getBytes());
+    txnBatch.commit();
+    txnBatch.beginNextTransaction();
+    txnBatch.write("a5,b6".getBytes());
+    txnBatch.write("a7,b8".getBytes());
+    txnBatch.commit();
+    txnBatch.close();
+
+    Assert.assertEquals("", 0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
+    rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
+
+    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
+    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000"));
+    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2"));
+    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
+    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
+    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
+    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
+    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
+    Assert.assertTrue(rs.get(4), rs.get(4).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8"));
+    Assert.assertTrue(rs.get(4), rs.get(4).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000"));
+
+    queryTable(driver, "update default.streamingnobuckets set a=0, b=0 where a='a7'");
+    queryTable(driver, "delete from default.streamingnobuckets where a='a1'");
+    rs = queryTable(driver, "select a, b from default.streamingnobuckets order by a, b");
+    int row = 0;
+    Assert.assertEquals("at row=" + row, "0\t0", rs.get(row++));
+    Assert.assertEquals("at row=" + row, "a3\tb4", rs.get(row++));
+    Assert.assertEquals("at row=" + row, "a5\tb6", rs.get(row++));
+    Assert.assertEquals("at row=" + row, "foo\tbar", rs.get(row++));
+
+    queryTable(driver, "alter table default.streamingnobuckets compact 'major'");
+    runWorker(conf);
+    rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
+
+    Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
+    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
+    Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
+    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
+    Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
+    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
+    Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t0\t0"));
+    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
+  }
+
+  /**
+   * this is a clone from TestTxnStatement2....
+   */
+  public static void runWorker(HiveConf hiveConf) throws MetaException {
+    AtomicBoolean stop = new AtomicBoolean(true);
+    Worker t = new Worker();
+    t.setThreadId((int) t.getId());
+    t.setConf(hiveConf);
+    AtomicBoolean looped = new AtomicBoolean();
+    t.init(stop, looped);
+    t.run();
+  }
+
+  // stream data into streaming table with N buckets, then copy the data into another bucketed table
+  // check if bucketing in both was done in the same way
+  @Test
+  public void testStreamBucketingMatchesRegularBucketing() throws Exception {
+    int bucketCount = 100;
+
+    String dbUri = "raw://" + new Path(dbFolder.newFolder().toString()).toUri().toString();
+    String tableLoc  = "'" + dbUri + Path.SEPARATOR + "streamedtable" + "'";
+    String tableLoc2 = "'" + dbUri + Path.SEPARATOR + "finaltable" + "'";
+    String tableLoc3 = "'" + dbUri + Path.SEPARATOR + "nobucket" + "'";
+
+    runDDL(driver, "create database testBucketing3");
+    runDDL(driver, "use testBucketing3");
+    runDDL(driver, "create table streamedtable ( key1 string,key2 int,data string ) clustered by ( key1,key2 ) into "
+            + bucketCount + " buckets  stored as orc  location " + tableLoc + " TBLPROPERTIES ('transactional'='true')") ;
+//  In 'nobucket' table we capture bucketid from streamedtable to workaround a hive bug that prevents joins two identically bucketed tables
+    runDDL(driver, "create table nobucket ( bucketid int, key1 string,key2 int,data string ) location " + tableLoc3) ;
+    runDDL(driver, "create table finaltable ( bucketid int, key1 string,key2 int,data string ) clustered by ( key1,key2 ) into "
+            + bucketCount + " buckets  stored as orc location " + tableLoc2 + " TBLPROPERTIES ('transactional'='true')");
+
+
+    String[] records = new String[] {
+    "PSFAHYLZVC,29,EPNMA",
+    "PPPRKWAYAU,96,VUTEE",
+    "MIAOFERCHI,3,WBDSI",
+    "CEGQAZOWVN,0,WCUZL",
+    "XWAKMNSVQF,28,YJVHU",
+    "XBWTSAJWME,2,KDQFO",
+    "FUVLQTAXAY,5,LDSDG",
+    "QTQMDJMGJH,6,QBOMA",
+    "EFLOTLWJWN,71,GHWPS",
+    "PEQNAOJHCM,82,CAAFI",
+    "MOEKQLGZCP,41,RUACR",
+    "QZXMCOPTID,37,LFLWE",
+    "EYALVWICRD,13,JEZLC",
+    "VYWLZAYTXX,16,DMVZX",
+    "OSALYSQIXR,47,HNZVE",
+    "JGKVHKCEGQ,25,KSCJB",
+    "WQFMMYDHET,12,DTRWA",
+    "AJOVAYZKZQ,15,YBKFO",
+    "YAQONWCUAU,31,QJNHZ",
+    "DJBXUEUOEB,35,IYCBL"
+    };
+
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "streamedtable", null);
+    String[] colNames1 = new String[] { "key1", "key2", "data" };
+    DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",",  endPt);
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, wr);
+    txnBatch.beginNextTransaction();
+
+    for (String record : records) {
+      txnBatch.write(record.toString().getBytes());
+    }
+
+    txnBatch.commit();
+    txnBatch.close();
+    connection.close();
+
+    ArrayList<String> res1 = queryTable(driver, "select row__id.bucketid, * from streamedtable order by key2");
+    for (String re : res1) {
+      System.out.println(re);
+    }
+
+    driver.run("insert into nobucket select row__id.bucketid,* from streamedtable");
+    runDDL(driver, " insert into finaltable select * from nobucket");
+    ArrayList<String> res2 = queryTable(driver, "select row__id.bucketid,* from finaltable where row__id.bucketid<>bucketid");
+    for (String s : res2) {
+      LOG.error(s);
+    }
+    Assert.assertTrue(res2.isEmpty());
+  }
+
+
+  @Test
+  public void testTableValidation() throws Exception {
+    int bucketCount = 100;
+
+    String dbUri = "raw://" + new Path(dbFolder.newFolder().toString()).toUri().toString();
+    String tbl1 = "validation1";
+    String tbl2 = "validation2";
+
+    String tableLoc  = "'" + dbUri + Path.SEPARATOR + tbl1 + "'";
+    String tableLoc2 = "'" + dbUri + Path.SEPARATOR + tbl2 + "'";
+
+    runDDL(driver, "create database testBucketing3");
+    runDDL(driver, "use testBucketing3");
+
+    runDDL(driver, "create table " + tbl1 + " ( key1 string, data string ) clustered by ( key1 ) into "
+            + bucketCount + " buckets  stored as orc  location " + tableLoc + " TBLPROPERTIES ('transactional'='false')") ;
+
+    runDDL(driver, "create table " + tbl2 + " ( key1 string, data string ) clustered by ( key1 ) into "
+            + bucketCount + " buckets  stored as orc  location " + tableLoc2 + " TBLPROPERTIES ('transactional'='false')") ;
+
+
+    try {
+      HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "validation1", null);
+      endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+      Assert.assertTrue("InvalidTable exception was not thrown", false);
+    } catch (InvalidTable e) {
+      // expecting this exception
+    }
+    try {
+      HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "validation2", null);
+      endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+      Assert.assertTrue("InvalidTable exception was not thrown", false);
+    } catch (InvalidTable e) {
+      // expecting this exception
+    }
+  }
+
+  /**
+   * @deprecated use {@link #checkDataWritten2(Path, long, long, int, String, boolean, String...)} -
+   * there is little value in using InputFormat directly
+   */
+  @Deprecated
+  private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
+                                String... records) throws Exception {
+    ValidWriteIdList writeIds = msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, writeIds);
+    Assert.assertEquals(0, dir.getObsolete().size());
+    Assert.assertEquals(0, dir.getOriginalFiles().size());
+    List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
+    System.out.println("Files found: ");
+    for (AcidUtils.ParsedDelta pd : current) {
+      System.out.println(pd.getPath().toString());
+    }
+    Assert.assertEquals(numExpectedFiles, current.size());
+
+    // find the absolute minimum transaction
+    long min = Long.MAX_VALUE;
+    long max = Long.MIN_VALUE;
+    for (AcidUtils.ParsedDelta pd : current) {
+      if (pd.getMaxWriteId() > max) {
+        max = pd.getMaxWriteId();
+      }
+      if (pd.getMinWriteId() < min) {
+        min = pd.getMinWriteId();
+      }
+    }
+    Assert.assertEquals(minTxn, min);
+    Assert.assertEquals(maxTxn, max);
+
+    InputFormat inf = new OrcInputFormat();
+    JobConf job = new JobConf();
+    job.set("mapred.input.dir", partitionPath.toString());
+    job.set(BUCKET_COUNT, Integer.toString(buckets));
+    job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "id,msg");
+    job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string");
+    AcidUtils.setAcidOperationalProperties(job, true, null);
+    job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
+    job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIds.toString());
+    InputSplit[] splits = inf.getSplits(job, buckets);
+    Assert.assertEquals(numExpectedFiles, splits.length);
+    org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
+            inf.getRecordReader(splits[0], job, Reporter.NULL);
+
+    NullWritable key = rr.createKey();
+    OrcStruct value = rr.createValue();
+    for (String record : records) {
+      Assert.assertEquals(true, rr.next(key, value));
+      Assert.assertEquals(record, value.toString());
+    }
+    Assert.assertEquals(false, rr.next(key, value));
+  }
+  /**
+   * @param validationQuery query to read from table to compare data against {@code records}
+   * @param records expected data.  each row is CVS list of values
+   */
+  private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, int numExpectedFiles,
+                                String validationQuery, boolean vectorize, String... records) throws Exception {
+    ValidWriteIdList txns = msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns);
+    Assert.assertEquals(0, dir.getObsolete().size());
+    Assert.assertEquals(0, dir.getOriginalFiles().size());
+    List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
+    System.out.println("Files found: ");
+    for (AcidUtils.ParsedDelta pd : current) {
+      System.out.println(pd.getPath().toString());
+    }
+    Assert.assertEquals(numExpectedFiles, current.size());
+
+    // find the absolute minimum transaction
+    long min = Long.MAX_VALUE;
+    long max = Long.MIN_VALUE;
+    for (AcidUtils.ParsedDelta pd : current) {
+      if (pd.getMaxWriteId() > max) {
+        max = pd.getMaxWriteId();
+      }
+      if (pd.getMinWriteId() < min) {
+        min = pd.getMinWriteId();
+      }
+    }
+    Assert.assertEquals(minTxn, min);
+    Assert.assertEquals(maxTxn, max);
+    boolean isVectorizationEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED);
+    if(vectorize) {
+      conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true);
+    }
+
+    String currStrategy = conf.getVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY);
+    for(String strategy : ((Validator.StringSet)HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.getValidator()).getExpected()) {
+      //run it with each split strategy - make sure there are differences
+      conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, strategy.toUpperCase());
+      List<String> actualResult = queryTable(driver, validationQuery);
+      for (int i = 0; i < actualResult.size(); i++) {
+        Assert.assertEquals("diff at [" + i + "].  actual=" + actualResult + " expected=" +
+          Arrays.toString(records), records[i], actualResult.get(i));
+      }
+    }
+    conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, currStrategy);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorizationEnabled);
+  }
+
+  private void checkNothingWritten(Path partitionPath) throws Exception {
+    ValidWriteIdList writeIds = msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, writeIds);
+    Assert.assertEquals(0, dir.getObsolete().size());
+    Assert.assertEquals(0, dir.getOriginalFiles().size());
+    List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
+    Assert.assertEquals(0, current.size());
+  }
+
+  @Test
+  public void testEndpointConnection() throws Exception {
+    // For partitioned table, partitionVals are specified
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals);
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName()); //shouldn't throw
+    connection.close();
+
+    // For unpartitioned table, partitionVals are not specified
+    endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+    endPt.newConnection(false, "UT_" + Thread.currentThread().getName()).close(); // should not throw
+
+    // For partitioned table, partitionVals are not specified
+    try {
+      endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, null);
+      connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName());
+      Assert.assertTrue("ConnectionError was not thrown", false);
+      connection.close();
+    } catch (ConnectionError e) {
+      // expecting this exception
+      String errMsg = "doesn't specify any partitions for partitioned table";
+      Assert.assertTrue(e.toString().endsWith(errMsg));
+    }
+
+    // For unpartitioned table, partition values are specified
+    try {
+      endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, partitionVals);
+      connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+      Assert.assertTrue("ConnectionError was not thrown", false);
+      connection.close();
+    } catch (ConnectionError e) {
+      // expecting this exception
+      String errMsg = "specifies partitions for unpartitioned table";
+      Assert.assertTrue(e.toString().endsWith(errMsg));
+    }
+  }
+
+  @Test
+  public void testAddPartition() throws Exception {
+    List<String> newPartVals = new ArrayList<String>(2);
+    newPartVals.add(PART1_CONTINENT);
+    newPartVals.add("Nepal");
+
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName
+            , newPartVals);
+
+    // Ensure partition is absent
+    try {
+      msClient.getPartition(endPt.database, endPt.table, endPt.partitionVals);
+      Assert.assertTrue("Partition already exists", false);
+    } catch (NoSuchObjectException e) {
+      // expect this exception
+    }
+
+    // Create partition
+    Assert.assertNotNull(endPt.newConnection(true, "UT_" + Thread.currentThread().getName()));
+
+    // Ensure partition is present
+    Partition p = msClient.getPartition(endPt.database, endPt.table, endPt.partitionVals);
+    Assert.assertNotNull("Did not find added partition", p);
+  }
+
+  @Test
+  public void testTransactionBatchEmptyCommit() throws Exception {
+    // 1)  to partitioned table
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+            partitionVals);
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection);
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+
+    txnBatch.beginNextTransaction();
+    txnBatch.commit();
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+            , txnBatch.getCurrentTransactionState());
+    txnBatch.close();
+    connection.close();
+
+    // 2) To unpartitioned table
+    endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+    writer = new DelimitedInputWriter(fieldNames2,",", endPt);
+    connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+
+    txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.commit();
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+            , txnBatch.getCurrentTransactionState());
+    txnBatch.close();
+    connection.close();
+  }
+
+  /**
+   * check that transactions that have not heartbeated and timedout get properly aborted
+   * @throws Exception
+   */
+  @Test
+  public void testTimeOutReaper() throws Exception {
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt);
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(5, writer);
+    txnBatch.beginNextTransaction();
+    conf.setTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, 0, TimeUnit.SECONDS);
+    //ensure txn timesout
+    conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.MILLISECONDS);
+    AcidHouseKeeperService houseKeeperService = new AcidHouseKeeperService();
+    houseKeeperService.setConf(conf);
+    houseKeeperService.run();
+    try {
+      //should fail because the TransactionBatch timed out
+      txnBatch.commit();
+    }
+    catch(TransactionError e) {
+      Assert.assertTrue("Expected aborted transaction", e.getCause() instanceof TxnAbortedException);
+    }
+    txnBatch.close();
+    txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.commit();
+    txnBatch.beginNextTransaction();
+    houseKeeperService.run();
+    try {
+      //should fail because the TransactionBatch timed out
+      txnBatch.commit();
+    }
+    catch(TransactionError e) {
+      Assert.assertTrue("Expected aborted transaction", e.getCause() instanceof TxnAbortedException);
+    }
+    txnBatch.close();
+    connection.close();
+  }
+
+  @Test
+  public void testHeartbeat() throws Exception {
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt, connection);
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(5, writer);
+    txnBatch.beginNextTransaction();
+    //todo: this should ideally check Transaction heartbeat as well, but heartbeat
+    //timestamp is not reported yet
+    //GetOpenTxnsInfoResponse txnresp = msClient.showTxns();
+    ShowLocksRequest request = new ShowLocksRequest();
+    request.setDbname(dbName2);
+    request.setTablename(tblName2);
+    ShowLocksResponse response = msClient.showLocks(request);
+    Assert.assertEquals("Wrong nubmer of locks: " + response, 1, response.getLocks().size());
+    ShowLocksResponseElement lock = response.getLocks().get(0);
+    long acquiredAt = lock.getAcquiredat();
+    long heartbeatAt = lock.getLastheartbeat();
+    txnBatch.heartbeat();
+    response = msClient.showLocks(request);
+    Assert.assertEquals("Wrong number of locks2: " + response, 1, response.getLocks().size());
+    lock = response.getLocks().get(0);
+    Assert.assertEquals("Acquired timestamp didn't match", acquiredAt, lock.getAcquiredat());
+    Assert.assertTrue("Expected new heartbeat (" + lock.getLastheartbeat() +
+      ") == old heartbeat(" + heartbeatAt +")", lock.getLastheartbeat() == heartbeatAt);
+    txnBatch.close();
+    int txnBatchSize = 200;
+    txnBatch = connection.fetchTransactionBatch(txnBatchSize, writer);
+    for(int i = 0; i < txnBatchSize; i++) {
+      txnBatch.beginNextTransaction();
+      if(i % 47 == 0) {
+        txnBatch.heartbeat();
+      }
+      if(i % 10 == 0) {
+        txnBatch.abort();
+      }
+      else {
+        txnBatch.commit();
+      }
+      if(i % 37 == 0) {
+        txnBatch.heartbeat();
+      }
+    }
+
+  }
+  @Test
+  public void testTransactionBatchEmptyAbort() throws Exception {
+    // 1) to partitioned table
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+            partitionVals);
+    StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName());
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection);
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.abort();
+    Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+            , txnBatch.getCurrentTransactionState());
+    txnBatch.close();
+    connection.close();
+
+    // 2) to unpartitioned table
+    endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+    writer = new DelimitedInputWriter(fieldNames,",", endPt);
+    connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName());
+
+    txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.abort();
+    Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+            , txnBatch.getCurrentTransactionState());
+    txnBatch.close();
+    connection.close();
+  }
+
+  @Test
+  public void testTransactionBatchCommit_Delimited() throws Exception {
+    testTransactionBatchCommit_Delimited(null);
+  }
+  @Test
+  public void testTransactionBatchCommit_DelimitedUGI() throws Exception {
+    testTransactionBatchCommit_Delimited(Utils.getUGI());
+  }
+  private void testTransactionBatchCommit_Delimited(UserGroupInformation ugi) throws Exception {
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+      partitionVals);
+    StreamingConnection connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName());
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, conf, connection);
+
+    // 1st Txn
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN
+      , txnBatch.getCurrentTransactionState());
+    txnBatch.write("1,Hello streaming".getBytes());
+    txnBatch.commit();
+
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+      , txnBatch.getCurrentTransactionState());
+
+    // 2nd Txn
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN
+      , txnBatch.getCurrentTransactionState());
+    txnBatch.write("2,Welcome to streaming".getBytes());
+
+    // data should not be visible
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+
+    txnBatch.commit();
+
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+      "{2, Welcome to streaming}");
+
+    txnBatch.close();
+    Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+      , txnBatch.getCurrentTransactionState());
+
+
+    connection.close();
+
+
+    // To Unpartitioned table
+    endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+    connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName());
+    writer = new DelimitedInputWriter(fieldNames,",", endPt, conf, connection);
+
+    // 1st Txn
+    txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN
+      , txnBatch.getCurrentTransactionState());
+    txnBatch.write("1,Hello streaming".getBytes());
+    txnBatch.commit();
+
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+      , txnBatch.getCurrentTransactionState());
+    connection.close();
+  }
+
+  @Test
+  public void testTransactionBatchCommit_Regex() throws Exception {
+    testTransactionBatchCommit_Regex(null);
+  }
+  @Test
+  public void testTransactionBatchCommit_RegexUGI() throws Exception {
+    testTransactionBatchCommit_Regex(Utils.getUGI());
+  }
+  private void testTransactionBatchCommit_Regex(UserGroupInformation ugi) throws Exception {
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+      partitionVals);
+    StreamingConnection connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName());
+    String regex = "([^,]*),(.*)";
+    StrictRegexWriter writer = new StrictRegexWriter(regex, endPt, conf, connection);
+
+    // 1st Txn
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN
+      , txnBatch.getCurrentTransactionState());
+    txnBatch.write("1,Hello streaming".getBytes());
+    txnBatch.commit();
+
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+      , txnBatch.getCurrentTransactionState());
+
+    // 2nd Txn
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN
+      , txnBatch.getCurrentTransactionState());
+    txnBatch.write("2,Welcome to streaming".getBytes());
+
+    // data should not be visible
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+
+    txnBatch.commit();
+
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+      "{2, Welcome to streaming}");
+
+    txnBatch.close();
+    Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+      , txnBatch.getCurrentTransactionState());
+
+
+    connection.close();
+
+
+    // To Unpartitioned table
+    endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+    connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName());
+    regex = "([^:]*):(.*)";
+    writer = new StrictRegexWriter(regex, endPt, conf, connection);
+
+    // 1st Txn
+    txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN
+      , txnBatch.getCurrentTransactionState());
+    txnBatch.write("1:Hello streaming".getBytes());
+    txnBatch.commit();
+
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+      , txnBatch.getCurrentTransactionState());
+    connection.close();
+  }
+
+  @Test
+  public void testTransactionBatchCommit_Json() throws Exception {
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+            partitionVals);
+    StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName());
+    StrictJsonWriter writer = new StrictJsonWriter(endPt, connection);
+
+    // 1st Txn
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN
+            , txnBatch.getCurrentTransactionState());
+    String rec1 = "{\"id\" : 1, \"msg\": \"Hello streaming\"}";
+    txnBatch.write(rec1.getBytes());
+    txnBatch.commit();
+
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+            , txnBatch.getCurrentTransactionState());
+
+    txnBatch.close();
+    Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+            , txnBatch.getCurrentTransactionState());
+
+    connection.close();
+    List<String> rs = queryTable(driver, "select * from " + dbName + "." + tblName);
+    Assert.assertEquals(1, rs.size());
+  }
+
+  @Test
+  public void testRemainingTransactions() throws Exception {
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+            partitionVals);
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+    StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName());
+
+    // 1) test with txn.Commit()
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+    int batch=0;
+    int initialCount = txnBatch.remainingTransactions();
+    while (txnBatch.remainingTransactions()>0) {
+      txnBatch.beginNextTransaction();
+      Assert.assertEquals(--initialCount, txnBatch.remainingTransactions());
+      for (int rec=0; rec<2; ++rec) {
+        Assert.assertEquals(TransactionBatch.TxnState.OPEN
+                , txnBatch.getCurrentTransactionState());
+        txnBatch.write((batch * rec + ",Hello streaming").getBytes());
+      }
+      txnBatch.commit();
+      Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+              , txnBatch.getCurrentTransactionState());
+      ++batch;
+    }
+    Assert.assertEquals(0, txnBatch.remainingTransactions());
+    txnBatch.close();
+
+    Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+            , txnBatch.getCurrentTransactionState());
+
+    // 2) test with txn.Abort()
+    txnBatch =  connection.fetchTransactionBatch(10, writer);
+    batch=0;
+    initialCount = txnBatch.remainingTransactions();
+    while (txnBatch.remainingTransactions()>0) {
+      txnBatch.beginNextTransaction();
+      Assert.assertEquals(--initialCount,txnBatch.remainingTransactions());
+      for (int rec=0; rec<2; ++rec) {
+        Assert.assertEquals(TransactionBatch.TxnState.OPEN
+                , txnBatch.getCurrentTransactionState());
+        txnBatch.write((batch * rec + ",Hello streaming").getBytes());
+      }
+      txnBatch.abort();
+      Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+              , txnBatch.getCurrentTransactionState());
+      ++batch;
+    }
+    Assert.assertEquals(0, txnBatch.remainingTransactions());
+    txnBatch.close();
+
+    Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+            , txnBatch.getCurrentTransactionState());
+
+    connection.close();
+  }
+
+  @Test
+  public void testTransactionBatchAbort() throws Exception {
+
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+            partitionVals);
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection);
+
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("1,Hello streaming".getBytes());
+    txnBatch.write("2,Welcome to streaming".getBytes());
+    txnBatch.abort();
+
+    checkNothingWritten(partLoc);
+
+    Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+            , txnBatch.getCurrentTransactionState());
+
+    txnBatch.close();
+    connection.close();
+
+    checkNothingWritten(partLoc);
+
+  }
+
+
+  @Test
+  public void testTransactionBatchAbortAndCommit() throws Exception {
+    String agentInfo = "UT_" + Thread.currentThread().getName();
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+            partitionVals);
+    StreamingConnection connection = endPt.newConnection(false, agentInfo);
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt, connection);
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("1,Hello streaming".getBytes());
+    txnBatch.write("2,Welcome to streaming".getBytes());
+    ShowLocksResponse resp = msClient.showLocks(new ShowLocksRequest());
+    Assert.assertEquals("LockCount", 1, resp.getLocksSize());
+    Assert.assertEquals("LockType", LockType.SHARED_READ, resp.getLocks().get(0).getType());
+    Assert.assertEquals("LockState", LockState.ACQUIRED, resp.getLocks().get(0).getState());
+    Assert.assertEquals("AgentInfo", agentInfo, resp.getLocks().get(0).getAgentInfo());
+    txnBatch.abort();
+
+    checkNothingWritten(partLoc);
+
+    Assert.assertEquals(TransactionBatch.TxnState.ABORTED
+            , txnBatch.getCurrentTransactionState());
+
+    txnBatch.beginNextTransaction();
+    txnBatch.write("1,Hello streaming".getBytes());
+    txnBatch.write("2,Welcome to streaming".getBytes());
+    txnBatch.commit();
+
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+            "{2, Welcome to streaming}");
+
+    txnBatch.close();
+    connection.close();
+  }
+
+  @Test
+  public void testMultipleTransactionBatchCommits() throws Exception {
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+            partitionVals);
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
+    StreamingConnection connection = endPt.newConnection(true, "UT_" + Thread.currentThread().getName());
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("1,Hello streaming".getBytes());
+    txnBatch.commit();
+    String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg";
+    checkDataWritten2(partLoc, 1, 10, 1, validationQuery, false, "1\tHello streaming");
+
+    txnBatch.beginNextTransaction();
+    txnBatch.write("2,Welcome to streaming".getBytes());
+    txnBatch.commit();
+
+    checkDataWritten2(partLoc, 1, 10,  1, validationQuery, true, "1\tHello streaming",
+            "2\tWelcome to streaming");
+
+    txnBatch.close();
+
+    // 2nd Txn Batch
+    txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("3,Hello streaming - once again".getBytes());
+    txnBatch.commit();
+
+    checkDataWritten2(partLoc, 1, 20,  2, validationQuery, false, "1\tHello streaming",
+            "2\tWelcome to streaming", "3\tHello streaming - once again");
+
+    txnBatch.beginNextTransaction();
+    txnBatch.write("4,Welcome to streaming - once again".getBytes());
+    txnBatch.commit();
+
+    checkDataWritten2(partLoc, 1, 20,  2, validationQuery, true, "1\tHello streaming",
+            "2\tWelcome to streaming", "3\tHello streaming - once again",
+            "4\tWelcome to streaming - once again");
+
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+            , txnBatch.getCurrentTransactionState());
+
+    txnBatch.close();
+
+    connection.close();
+  }
+
+  @Test
+  public void testInterleavedTransactionBatchCommits() throws Exception {
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+            partitionVals);
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", endPt);
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+
+    // Acquire 1st Txn Batch
+    TransactionBatch txnBatch1 =  connection.fetchTransactionBatch(10, writer);
+    txnBatch1.beginNextTransaction();
+
+    // Acquire 2nd Txn Batch
+    DelimitedInputWriter writer2 = new DelimitedInputWriter(fieldNames, ",", endPt);
+    TransactionBatch txnBatch2 =  connection.fetchTransactionBatch(10, writer2);
+    txnBatch2.beginNextTransaction();
+
+    // Interleaved writes to both batches
+    txnBatch1.write("1,Hello streaming".getBytes());
+    txnBatch2.write("3,Hello streaming - once again".getBytes());
+
+    checkNothingWritten(partLoc);
+
+    txnBatch2.commit();
+
+    String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg";
+    checkDataWritten2(partLoc, 11, 20, 1,
+      validationQuery, true, "3\tHello streaming - once again");
+
+    txnBatch1.commit();
+    /*now both batches have committed (but not closed) so we for each primary file we expect a side
+    file to exist and indicate the true length of primary file*/
+    FileSystem fs = partLoc.getFileSystem(conf);
+    AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf,
+            msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName)));
+    for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
+      for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
+        Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
+        Assert.assertTrue(lengthFile + " missing", fs.exists(lengthFile));
+        long lengthFileSize = fs.getFileStatus(lengthFile).getLen();
+        Assert.assertTrue("Expected " + lengthFile + " to be non empty. lengh=" +
+          lengthFileSize, lengthFileSize > 0);
+        long logicalLength = AcidUtils.getLogicalLength(fs, stat);
+        long actualLength = stat.getLen();
+        Assert.assertTrue("", logicalLength == actualLength);
+      }
+    }
+    checkDataWritten2(partLoc, 1, 20, 2,
+      validationQuery, false,"1\tHello streaming", "3\tHello streaming - once again");
+
+    txnBatch1.beginNextTransaction();
+    txnBatch1.write("2,Welcome to streaming".getBytes());
+
+    txnBatch2.beginNextTransaction();
+    txnBatch2.write("4,Welcome to streaming - once again".getBytes());
+    //here each batch has written data and committed (to bucket0 since table only has 1 bucket)
+    //so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length.  Furthermore, each bucket0
+    //has now received more data(logically - it's buffered) but it is not yet committed.
+    //lets check that side files exist, etc
+    dir = AcidUtils.getAcidState(partLoc, conf, msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName)));
+    for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
+      for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
+        Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
+        Assert.assertTrue(lengthFile + " missing", fs.exists(lengthFile));
+        long lengthFileSize = fs.getFileStatus(lengthFile).getLen();
+        Assert.assertTrue("Expected " + lengthFile + " to be non empty. lengh=" +
+          lengthFileSize, lengthFileSize > 0);
+        long logicalLength = AcidUtils.getLogicalLength(fs, stat);
+        long actualLength = stat.getLen();
+        Assert.assertTrue("", logicalLength <= actualLength);
+      }
+    }
+    checkDataWritten2(partLoc, 1, 20, 2,
+      validationQuery, true,"1\tHello streaming", "3\tHello streaming - once again");
+
+    txnBatch1.commit();
+
+    checkDataWritten2(partLoc, 1, 20, 2,
+      validationQuery, false, "1\tHello streaming",
+        "2\tWelcome to streaming",
+        "3\tHello streaming - once again");
+
+    txnBatch2.commit();
+
+    checkDataWritten2(partLoc, 1, 20, 2,
+      validationQuery, true, "1\tHello streaming",
+        "2\tWelcome to streaming",
+        "3\tHello streaming - once again",
+        "4\tWelcome to streaming - once again");
+
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+            , txnBatch1.getCurrentTransactionState());
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+            , txnBatch2.getCurrentTransactionState());
+
+    txnBatch1.close();
+    txnBatch2.close();
+
+    connection.close();
+  }
+
+  private static class WriterThd extends Thread {
+
+    private final StreamingConnection conn;
+    private final DelimitedInputWriter writer;
+    private final String data;
+    private Throwable error;
+
+    WriterThd(HiveEndPoint ep, String data) throws Exception {
+      super("Writer_" + data);
+      writer = new DelimitedInputWriter(fieldNames, ",", ep);
+      conn = ep.newConnection(false, "UT_" + Thread.currentThread().getName());
+      this.data = data;
+      setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+        @Override
+        public void uncaughtException(Thread thread, Throwable throwable) {
+          error = throwable;
+          LOG.error("Thread " + thread.getName() + " died: " + throwable.getMessage(), throwable);
+        }
+      });
+    }
+
+    @Override
+    public void run() {
+      TransactionBatch txnBatch = null;
+      try {
+        txnBatch =  conn.fetchTransactionBatch(10, writer);
+        while (txnBatch.remainingTransactions() > 0) {
+          txnBatch.beginNextTransaction();
+          txnBatch.write(data.getBytes());
+          txnBatch.write(data.getBytes());
+          txnBatch.commit();
+        } // while
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      } finally {
+        if (txnBatch != null) {
+          try {
+            txnBatch.close();
+          } catch (Exception e) {
+            LOG.error("txnBatch.close() failed: " + e.getMessage(), e);
+            conn.close();
+          }
+        }
+        try {
+          conn.close();
+        } catch (Exception e) {
+          LOG.error("conn.close() failed: " + e.getMessage(), e);
+        }
+
+      }
+    }
+  }
+
+  @Test
+  public void testConcurrentTransactionBatchCommits() throws Exception {
+    final HiveEndPoint ep = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals);
+    List<WriterThd> writers = new ArrayList<WriterThd>(3);
+    writers.add(new WriterThd(ep, "1,Matrix"));
+    writers.add(new WriterThd(ep, "2,Gandhi"));
+    writers.add(new WriterThd(ep, "3,Silence"));
+
+    for(WriterThd w : writers) {
+      w.start();
+    }
+    for(WriterThd w : writers) {
+      w.join();
+    }
+    for(WriterThd w : writers) {
+      if(w.error != null) {
+        Assert.assertFalse("Writer thread" + w.getName() + " died: " + w.error.getMessage() +
+          " See log file for stack trace", true);
+      }
+    }
+  }
+
+
+  private ArrayList<SampleRec> dumpBucket(Path orcFile) throws IOException {
+    org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.getLocal(new Configuration());
+    Reader reader = OrcFile.createReader(orcFile,
+            OrcFile.readerOptions(conf).filesystem(fs));
+
+    RecordReader rows = reader.rows();
+    StructObjectInspector inspector = (StructObjectInspector) reader
+            .getObjectInspector();
+
+    System.out.format("Found Bucket File : %s \n", orcFile.getName());
+    ArrayList<SampleRec> result = new ArrayList<SampleRec>();
+    while (rows.hasNext()) {
+      Object row = rows.next(null);
+      SampleRec rec = (SampleRec) deserializeDeltaFileRow(row, inspector)[5];
+      result.add(rec);
+    }
+
+    return result;
+  }
+
+  // Assumes stored data schema = [acid fields],string,int,string
+  // return array of 6 fields, where the last field has the actual data
+  private static Object[] deserializeDeltaFileRow(Object row, StructObjectInspector inspector) {
+    List<? extends StructField> fields = inspector.getAllStructFieldRefs();
+
+    WritableIntObjectInspector f0ins = (WritableIntObjectInspector) fields.get(0).getFieldObjectInspector();
+    WritableLongObjectInspector f1ins = (WritableLongObjectInspector) fields.get(1).getFieldObjectInspector();
+    WritableIntObjectInspector f2ins = (WritableIntObjectInspector) fields.get(2).getFieldObjectInspector();
+    WritableLongObjectInspector f3ins = (WritableLongObjectInspector) fields.get(3).getFieldObjectInspector();
+    WritableLongObjectInspector f4ins = (WritableLongObjectInspector)  fields.get(4).getFieldObjectInspector();
+    StructObjectInspector f5ins = (StructObjectInspector) fields.get(5).getFieldObjectInspector();
+
+    int f0 = f0ins.get(inspector.getStructFieldData(row, fields.get(0)));
+    long f1 = f1ins.get(inspector.getStructFieldData(row, fields.get(1)));
+    int f2 = f2ins.get(inspector.getStructFieldData(row, fields.get(2)));
+    long f3 = f3ins.get(inspector.getStructFieldData(row, fields.get(3)));
+    long f4 = f4ins.get(inspector.getStructFieldData(row, fields.get(4)));
+    SampleRec f5 = deserializeInner(inspector.getStructFieldData(row, fields.get(5)), f5ins);
+
+    return new Object[] {f0, f1, f2, f3, f4, f5};
+  }
+
+  // Assumes row schema => string,int,string
+  private static SampleRec deserializeInner(Object row, StructObjectInspector inspector) {
+    List<? extends StructField> fields = inspector.getAllStructFieldRefs();
+
+    WritableStringObjectInspector f0ins = (WritableStringObjectInspector) fields.get(0).getFieldObjectInspector();
+    WritableIntObjectInspector f1ins = (WritableIntObjectInspector) fields.get(1).getFieldObjectInspector();
+    WritableStringObjectInspector f2ins = (WritableStringObjectInspector) fields.get(2).getFieldObjectInspector();
+
+    String f0 = f0ins.getPrimitiveJavaObject(inspector.getStructFieldData(row, fields.get(0)));
+    int f1 = f1ins.get(inspector.getStructFieldData(row, fields.get(1)));
+    String f2 = f2ins.getPrimitiveJavaObject(inspector.getStructFieldData(row, fields.get(2)));
+    return new SampleRec(f0, f1, f2);
+  }
+
+  @Test
+  public void testBucketing() throws Exception {
+    String agentInfo = "UT_" + Thread.currentThread().getName();
+    dropDB(msClient, dbName3);
+    dropDB(msClient, dbName4);
+
+    // 1) Create two bucketed tables
+    String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
+    dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
+    String[] colNames = "key1,key2,data".split(",");
+    String[] colTypes = "string,int,string".split(",");
+    String[] bucketNames = "key1,key2".split(",");
+    int bucketCount = 4;
+    createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames
+            , null, dbLocation, bucketCount);
+
+    String dbLocation2 = dbFolder.newFolder(dbName4).getCanonicalPath() + ".db";
+    dbLocation2 = dbLocation2.replaceAll("\\\\","/"); // for windows paths
+    String[] colNames2 = "key3,key4,data2".split(",");
+    String[] colTypes2 = "string,int,string".split(",");
+    String[] bucketNames2 = "key3,key4".split(",");
+    createDbAndTable(driver, dbName4, tblName4, null, colNames2, colTypes2, bucketNames2
+            , null, dbLocation2, bucketCount);
+
+
+    // 2) Insert data into both tables
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null);
+    StreamingConnection connection = endPt.newConnection(false, agentInfo);
+    DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt, connection);
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("name0,1,Hello streaming".getBytes());
+    txnBatch.write("name2,2,Welcome to streaming".getBytes());
+    txnBatch.write("name4,2,more Streaming unlimited".getBytes());
+    txnBatch.write("name5,2,even more Streaming unlimited".getBytes());
+    txnBatch.commit();
+
+
+    HiveEndPoint endPt2 = new HiveEndPoint(metaStoreURI, dbName4, tblName4, null);
+    StreamingConnection connection2 = endPt2.newConnection(false, agentInfo);
+    DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", endPt2, connection);
+    TransactionBatch txnBatch2 =  connection2.fetchTransactionBatch(2, writer2);
+    txnBatch2.beginNextTransaction();
+
+    txnBatch2.write("name5,2,fact3".getBytes());  // bucket 0
+    txnBatch2.write("name8,2,fact3".getBytes());  // bucket 1
+    txnBatch2.write("name0,1,fact1".getBytes());  // bucket 2
+
+    txnBatch2.commit();
+
+    // 3 Check data distribution in  buckets
+
+    HashMap<Integer, ArrayList<SampleRec>> actual1 = dumpAllBuckets(dbLocation, tblName3);
+    HashMap<Integer, ArrayList<SampleRec>> actual2 = dumpAllBuckets(dbLocation2, tblName4);
+    System.err.println("\n  Table 1");
+    System.err.println(actual1);
+    System.err.println("\n  Table 2");
+    System.err.println(actual2);
+
+    // assert bucket listing is as expected
+    Assert.assertEquals("number of buckets does not match expectation", actual1.values().size(), 3);
+    Assert.assertEquals("records in bucket does not match expectation", actual1.get(0).size(), 2);
+    Assert.assertEquals("records in bucket does not match expectation", actual1.get(1).size(), 1);
+    Assert.assertTrue("bucket 2 shouldn't have been created", actual1.get(2) == null);
+    Assert.assertEquals("records in bucket does not match expectation", actual1.get(3).size(), 1);
+  }
+  private void runCmdOnDriver(String cmd) throws QueryFailedException {
+    boolean t = runDDL(driver, cmd);
+    Assert.assertTrue(cmd + " failed", t);
+  }
+
+
+  @Test
+  public void testFileDump() throws Exception {
+    String agentInfo = "UT_" + Thread.currentThread().getName();
+    dropDB(msClient, dbName3);
+    dropDB(msClient, dbName4);
+
+    // 1) Create two bucketed tables
+    String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
+    dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
+    String[] colNames = "key1,key2,data".split(",");
+    String[] colTypes = "string,int,string".split(",");
+    String[] bucketNames = "key1,key2".split(",");
+    int bucketCount = 4;
+    createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames
+        , null, dbLocation, bucketCount);
+
+    String dbLocation2 = dbFolder.newFolder(dbName4).getCanonicalPath() + ".db";
+    dbLocation2 = dbLocation2.replaceAll("\\\\","/"); // for windows paths
+    String[] colNames2 = "key3,key4,data2".split(",");
+    String[] colTypes2 = "string,int,string".split(",");
+    String[] bucketNames2 = "key3,key4".split(",");
+    createDbAndTable(driver, dbName4, tblName4, null, colNames2, colTypes2, bucketNames2
+        , null, dbLocation2, bucketCount);
+
+
+    // 2) Insert data into both tables
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null);
+    StreamingConnection connection = endPt.newConnection(false, agentInfo);
+    DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt, connection);
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("name0,1,Hello streaming".getBytes());
+    txnBatch.write("name2,2,Welcome to streaming".getBytes());
+    txnBatch.write("name4,2,more Streaming unlimited".getBytes());
+    txnBatch.write("name5,2,even more Streaming unlimited".getBytes());
+    txnBatch.commit();
+
+    PrintStream origErr = System.err;
+    ByteArrayOutputStream myErr = new ByteArrayOutputStream();
+
+    // replace stderr and run command
+    System.setErr(new PrintStream(myErr));
+    FileDump.main(new String[]{dbLocation});
+    System.err.flush();
+    System.setErr(origErr);
+
+    String errDump = new String(myErr.toByteArray());
+    Assert.assertEquals(false, errDump.contains("file(s) are corrupted"));
+    // since this test runs on local file system which does not have an API to tell if files or
+    // open or not, we are testing for negative case even though the bucket files are still open
+    // for writes (transaction batch not closed yet)
+    Assert.assertEquals(false, errDump.contains("is still open for writes."));
+
+    HiveEndPoint endPt2 = new HiveEndPoint(metaStoreURI, dbName4, tblName4, null);
+    DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", endPt2);
+    StreamingConnection connection2 = endPt2.newConnection(false, agentInfo);
+    TransactionBatch txnBatch2 =  connection2.fetchTransactionBatch(2, writer2);
+    txnBatch2.beginNextTransaction();
+
+    txnBatch2.write("name5,2,fact3".getBytes());  // bucket 0
+    txnBatch2.write("name8,2,fact3".getBytes());  // bucket 1
+    txnBatch2.write("name0,1,fact1".getBytes());  // bucket 2
+    // no data for bucket 3 -- expect 0 length bucket file
+
+    txnBatch2.commit();
+
+    origErr = System.err;
+    myErr = new ByteArrayOutputStream();
+
+    // replace stderr and run command
+    System.setErr(new PrintStream(myErr));
+    FileDump.main(new String[]{dbLocation});
+    System.out.flush();
+    System.err.flush();
+    System.setErr(origErr);
+
+    errDump = new String(myErr.toByteArray());
+    Assert.assertEquals(false, errDump.contains("Exception"));
+    Assert.assertEquals(false, errDump.contains("file(s) are corrupted"));
+    Assert.assertEquals(false, errDump.contains("is still open for writes."));
+  }
+
+  @Test
+  public void testFileDumpCorruptDataFiles() throws Exception {
+    dropDB(msClient, dbName3);
+
+    // 1) Create two bucketed tables
+    String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
+    dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
+    String[] colNames = "key1,key2,data".split(",");
+    String[] colTypes = "string,int,string".split(",");
+    String[] bucketNames = "key1,key2".split(",");
+    int bucketCount = 4;
+    createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames
+        , null, dbLocation, bucketCount);
+
+    // 2) Insert data into both tables
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null);
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+    DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt, connection);
+
+    // we need side file for this test, so we create 2 txn batch and test with only one
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("name0,1,Hello streaming".getBytes());
+    txnBatch.write("name2,2,Welcome to streaming".getBytes());
+    txnBatch.write("name4,2,more Streaming unlimited".getBytes());
+    txnBatch.write("name5,2,even more Streaming unlimited".getBytes());
+    txnBatch.commit();
+
+    // intentionally corrupt some files
+    Path path = new Path(dbLocation);
+    Collection<String> files = FileDump.getAllFilesInPath(path, conf);
+    int readableFooter = -1;
+    for (String file : files) {
+      if (file.contains("bucket_00000")) {
+        // empty out the file
+        corruptDataFile(file, conf, Integer.MIN_VALUE);
+      } else if (file.contains("bucket_00001")) {
+        corruptDataFile(file, conf, -1);
+      } else if (file.contains("bucket_00002")) {
+        Assert.assertFalse("bucket 2 shouldn't have been created", true);
+      } else if (file.contains("bucket_00003")) {
+        corruptDataFile(file, conf, 100);
+      }
+    }
+
+    PrintStream origErr = System.err;
+    ByteArrayOutputStream myErr = new ByteArrayOutputStream();
+
+    // replace stderr and run command
+    System.setErr(new PrintStream(myErr));
+    FileDump.main(new String[]{dbLocation});
+    System.err.flush();
+    System.setErr(origErr);
+
+    String errDump = new String(myErr.toByteArray());
+    Assert.assertEquals(false, errDump.contains("Exception"));
+    Assert.assertEquals(true, errDump.contains("3 file(s) are corrupted"));
+    Assert.assertEquals(false, errDump.contains("is still open for writes."));
+
+    origErr = System.err;
+    myErr = new ByteArrayOutputStream();
+
+    // replace stderr and run command
+    System.setErr(new PrintStream(myErr));
+    FileDump.main(new String[]{dbLocation, "--recover", "--skip-dump"});
+    System.err.flush();
+    System.setErr(origErr);
+
+    errDump = new String(myErr.toByteArray());
+    Assert.assertEquals(true, errDump.contains("bucket_00000 recovered successfully!"));
+    Assert.assertEquals(true, errDump.contains("No readable footers found. Creating empty orc file."));
+    Assert.assertEquals(true, errDump.contains("bucket_00001 recovered successfully!"));
+    Assert.assertEquals(true, errDump.contains("bucket_00003 recovered successfully!"));
+    Assert.assertEquals(false, errDump.contains("Exception"));
+    Assert.assertEquals(false, errDump.contains("is still open for writes."));
+
+    // test after recovery
+    origErr = System.err;
+    myErr = new ByteArrayOutputStream();
+
+    // replace stdout and run command
+    System.setErr(new PrintStream(myErr));
+    FileDump.main(new String[]{dbLocation});
+    System.err.flush();
+    System.setErr(origErr);
+
+    errDump = new String(myErr.toByteArray());
+    Assert.assertEquals(false, errDump.contains("Exception"));
+    Assert.assertEquals(false, errDump.contains("file(s) are corrupted"));
+    Assert.assertEquals(false, errDump.contains("is still open for writes."));
+
+    // after recovery there shouldn't be any *_flush_length files
+    files = FileDump.getAllFilesInPath(path, conf);
+    for (String file : files) {
+      Assert.assertEquals(false, file.contains("_flush_length"));
+    }
+
+    txnBatch.close();
+  }
+
+  private void corruptDataFile(final String file, final Configuration conf, final int addRemoveBytes)
+      throws Exception {
+    Path bPath = new Path(file);
+    Path cPath = new Path(bPath.getParent(), bPath.getName() + ".corrupt");
+    FileSystem fs = bPath.getFileSystem(conf);
+    FileStatus fileStatus = fs.getFileStatus(bPath);
+    int len = addRemoveBytes == Integer.MIN_VALUE ? 0 : (int) fileStatus.getLen() + addRemoveBytes;
+    byte[] buffer = new byte[len];
+    FSDataInputStream fdis = fs.open(bPath);
+    fdis.readFully(0, buffer, 0, (int) Math.min(fileStatus.getLen(), buffer.length));
+    fdis.close();
+    FSDataOutputStream fdos = fs.create(cPath, true);
+    fdos.write(buffer, 0, buffer.length);
+    fdos.close();
+    fs.delete(bPath, false);
+    fs.rename(cPath, bPath);
+  }
+
+  @Test
+  public void testFileDumpCorruptSideFiles() throws Exception {
+    dropDB(msClient, dbName3);
+
+    // 1) Create two bucketed tables
+    String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
+    dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
+    String[] colNames = "key1,key2,data".split(",");
+    String[] colTypes = "string,int,string".split(",");
+    String[] bucketNames = "key1,key2".split(",");
+    int bucketCount = 4;
+    createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames
+        , null, dbLocation, bucketCount);
+
+    // 2) Insert data into both tables
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null);
+    StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
+    DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt, connection);
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("name0,1,Hello streaming".getBytes());
+    txnBatch.write("name2,2,Welcome to streaming".getBytes());
+    txnBatch.write("name4,2,more Streaming unlimited".getBytes());
+    txnBatch.write("name5,2,even more Streaming unlimited".getBytes());
+    txnBatch.write("name6,3,aHello streaming".getBytes());
+    txnBatch.commit();
+
+    Map<String,List<Long>> offsetMap = new HashMap<String,List<Long>>();
+    recordOffsets(conf, dbLocation, offsetMap);
+
+    txnBatch.beginNextTransaction();
+    txnBatch.write("name01,11,-Hello streaming".getBytes());
+    txnBatch.write("name21,21,-Welcome to streaming".getBytes());
+    txnBatch.write("name41,21,-more Streaming unlimited".getBytes());
+    txnBatch.write("name51,21,-even more Streaming unlimited".getBytes());
+    txnBatch.write("name02,12,--Hello streaming".getBytes());
+    txnBatch.write("name22,22,--Welcome to streaming".getBytes());
+    txnBatch.write("name42,22,--more Streaming unlimited".getBytes());
+    txnBatch.write("name52,22,--even more Streaming unlimited".getBytes());
+    txnBatch.write("name7,4,aWelcome to streaming".getBytes());
+    txnBatch.write("name8,5,amore Streaming unlimited".getBytes());
+    txnBatch.write("name9,6,aeven more Streaming unlimited".getBytes());
+    txnBatch.write("name10,7,bHello streaming".getBytes());
+    txnBatch.write("name11,8,bWelcome to streaming".getBytes());
+    txnBatch.write("name12,9,bmore Streaming unlimited".getBytes());
+    txnBatch.write("name13,10,beven more Streaming unlimited".getBytes());
+    txnBatch.commit();
+
+    recordOffsets(conf, dbLocation, offsetMap);
+
+    // intentionally corrupt some files
+    Path path = new Path(dbLocation);
+    Collection<String> files = FileDump.getAllFilesInPath(path, conf);
+    for (String file : files) {
+      if (file.contains("bucket_00000")) {
+        corruptSideFile(file, conf, offsetMap, "bucket_00000", -1); // corrupt last entry
+      } else if (file.contains("bucket_00001")) {
+        corruptSideFile(file, conf, offsetMap, "bucket_00001", 0); // empty out side file
+      } else if (file.contains("bucket_00002")) {
+        corruptSideFile(file, conf, offsetMap, "bucket_00002", 3); // total 3 entries (2 valid + 1 fake)
+      } else if (file.contains("bucket_00003")) {
+        corruptSideFile(file, conf, offsetMap, "bucket_00003", 10); // total 10 entries (2 valid + 8 fake)
+      }
+    }
+
+    PrintStream origErr = System.err;
+    ByteArrayOutputStream myErr = new ByteArrayOutputStream();
+
+    // replace stderr and run command
+    System.setErr(new PrintStream(myErr));
+    FileDump.main(new String[]{dbLocation});
+    System.err.flush();
+    System.setErr(origErr);
+
+    String errDump = new String(myErr.toByteArray());
+    Assert.assertEquals(true, errDump.contains("bucket_00000_flush_length [length: 11"));
+    Assert.assertEquals(true, errDump.contains("bucket_00001_flush_length [length: 0"));
+    Assert.assertEquals(true, errDump.contains("bucket_00002_flush_length [length: 24"));
+    Assert.assertEquals(true, errDump.contains("bucket_00003_flush_length [length: 80"));
+    Assert.assertEquals(false, errDump.contains("Exception"));
+    Assert.assertEquals(true, errDump.contains("4 file(s) are corrupted"));
+    Assert.assertEquals(false, errDump.contains("is still open for writes."));
+
+    origErr = System.err;
+    myErr = new ByteArrayOutputStream();
+
+    // replace stderr and run command
+    System.setErr(new PrintStream(myErr));
+    FileDump.main(new String[]{dbLocation, "--recover", "--skip-dump"});
+    System.err.flush();
+    System.setErr(origErr);
+
+    errDump = new String(myErr.toByteArray());
+    Assert.assertEquals(true, errDump.contains("bucket_00000 recovered successfully!"));
+    Assert.assertEquals(true, errDump.contains("bucket_00001 recovered successfully!"));
+    Assert.assertEquals(true, errDump.contains("bucket_00002 recovered successfully!"));
+    Assert.assertEquals(true, errDump.contains("bucket_00003 recovered successfully!"));
+    List<Long> offsets = offsetMap.get("bucket_00000");
+    Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " + offsets.toString()));
+    offsets = offsetMap.get("bucket_00001");
+    Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " + offsets.toString()));
+    offsets = offsetMap.get("bucket_00002");
+    Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " + offsets.toString()));
+    offsets = offsetMap.get("bucket_00003");
+    Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " + offsets.toString()));
+    Assert.assertEquals(false, errDump.contains("Exception"));
+    Assert.assertEquals(false, errDump.contains("is still open for writes."));
+
+    // test after recovery
+    origErr = System.err;
+    myErr = new ByteArrayOutputStream();
+
+    // replace stdout and run command
+    System.setErr(new PrintStream(myErr));
+    FileDump.main(new String[]{dbLocation});
+    System.err.flush();
+    System.setErr(origErr);
+
+    errDump = new String(myErr.toByteArray());
+    Assert.assertEquals(false, errDump.contains("Exception"));
+    Assert.assertEquals(false, errDump.contains("file(s) are corrupted"));
+    Assert.assertEquals(false, errDump.contains("is still open for writes."));
+
+    // after recovery there shouldn't be any *_flush_length files
+    files = FileDump.getAllFilesInPath(path, conf);
+    for (String file : files) {
+      Assert.assertEquals(false, file.contains("_flush_length"));
+    }
+
+    txnBatch.close();
+  }
+
+  private void corruptSideFile(final String file, final HiveConf conf,
+      final Map<String, List<Long>> offsetMap, final String key, final int numEntries)
+      throws IOException {
+    Path dataPath = new Path(file);
+    Path sideFilePath = OrcAcidUtils.getSideFile(dataPath);
+    Path cPath = new Path(sideFilePath.getParent(), sideFilePath.getName() + ".corrupt");
+    FileSystem fs = sideFilePath.getFileSystem(conf);
+    List<Long> offsets = offsetMap.get(key);
+    long lastOffset = offsets.get(offsets.size() - 1);
+    FSDataOutputStream fdos = fs.create(cPath, true);
+    // corrupt last entry
+    if (numEntries < 0) {
+      byte[] lastOffsetBytes = longToBytes(lastOffset);
+      for (int i = 0; i < offsets.size() - 1; i++) {
+        fdos.writeLong(offsets.get(i));
+      }
+
+      fdos.write(lastOffsetBytes, 0, 3);
+    } else if (numEntries > 0) {
+      int firstRun = Math.min(offsets.size(), numEntries);
+      // add original entries
+      for (int i=0; i < firstRun; i++) {
+        fdos.writeLong(offsets.get(i));
+      }
+
+      // add fake entries
+      int remaining = numEntries - firstRun;
+      for (int i = 0; i < remaining; i++) {
+        fdos.writeLong(lastOffset + ((i + 1) * 100));
+      }
+    }
+
+    fdos.close();
+    fs.delete(sideFilePath, false);
+    fs.rename(cPath, sideFilePath);
+  }
+
+  private  byte[] longToBytes(long x) {
+    ByteBuffer buffer = ByteBuffer.allocate(8);
+    buffer.putLong(x);
+    return buffer.array();
+  }
+
+  private void recordOffsets(final HiveConf conf, final String dbLocation,
+      final Map<String, List<Long>> offsetMap) throws IOException {
+    Path path = new Path(dbLocation);
+    Collection<String> files = FileDump.getAllFilesInPath(path, conf);
+    for (String file: files) {
+      Path bPath = new Path(file);
+      FileSystem fs = bPath.getFileSystem(conf);
+      FileStatus fileStatus = fs.getFileStatus(bPath);
+      long len = fileStatus.getLen();
+
+      if (file.contains("bucket_00000")) {
+        if (offsetMap.containsKey("bucket_00000")) {
+          List<Long> offsets = offsetMap.get("bucket_00000");
+          offsets.add(len);
+          offsetMap.put("bucket_00000", offsets);
+        } else {
+          List<Long> offsets = new ArrayList<Long>();
+          offsets.add(len);
+          offsetMap.put("bucket_00000", offsets);
+        }
+      } else if (file.contains("bucket_00001")) {
+        if (offsetMap.containsKey("bucket_00001")) {
+          List<Long> offsets = offsetMap.get("bucket_00001");
+          offsets.add(len);
+          offsetMap.put("bucket_00001", offsets);
+        } else {
+          List<Long> offsets = new ArrayList<Long>();
+          offsets.add(len);
+          offsetMap.put("bucket_00001", offsets);
+        }
+      } else if (file.contains("bucket_00002")) {
+        if (offsetMap.containsKey("bucket_00002")) {
+          List<Long> offsets = offsetMap.get("bucket_00002");
+          offsets.add(len);
+          offsetMap.put("bucket_00002", offsets);
+        } else {
+          List<Long> offsets = new ArrayList<Long>();
+          offsets.add(len);
+          offsetMap.put("bucket_00002", offsets);
+        }
+      } else if (file.contains("bucket_00003")) {
+        if (offsetMap.containsKey("bucket_00003")) {
+          List<Long> offsets = offsetMap.get("bucket_00003");
+          offsets.add(len);
+          offsetMap.put("bucket_00003", offsets);
+        } else {
+          List<Long> offsets = new ArrayList<Long>();
+          offsets.add(len);
+          offsetMap.put("bucket_00003", offsets);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testErrorHandling() throws Exception {
+    String agentInfo = "UT_" + Thread.currentThread().getName();
+    runCmdOnDriver("create database testErrors");
+    runCmdOnDriver("use testErrors");
+    runCmdOnDriver("create table T(a int, b int) clustered by (b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testErrors", "T", null);
+    StreamingConnection connection = endPt.newConnection(false, agentInfo);
+    DelimitedInputWriter innerWriter = new DelimitedInputWriter("a,b".split(","),",", endPt, connection);
+    FaultyWriter writer = new FaultyWriter(innerWriter);
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(2, writer);
+    txnBatch.close();
+    txnBatch.heartbeat();//this is no-op on closed batch
+    txnBatch.abort();//ditto
+    GetOpenTxnsInfoResponse r = msClient.showTxns();
+    Assert.assertEquals("HWM didn't match", 17, r.getTxn_high_water_mark());
+    List<TxnInfo> ti = r.getOpen_txns();
+    Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState());
+    Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState());
+
+    Exception expectedEx = null;
+    try {
+      txnBatch.beginNextTransaction();
+    }
+    catch(IllegalStateException ex) {
+      expectedEx = ex;
+    }
+    Assert.assertTrue("beginNextTransaction() should have failed",
+      expectedEx != null && expectedEx.getMessage().contains("has been closed()"));
+    expectedEx = null;
+    try {
+      txnBatch.write("name0,1,Hello streaming".getBytes());
+    }
+    catch(IllegalStateException ex) {
+      expectedEx = ex;
+    }
+    Assert.assertTrue("write()  should have failed",
+      expectedEx != null && expectedEx.getMessage().contains("has been closed()"));
+    expectedEx = null;
+    try {
+      txnBatch.commit();
+    }
+    catch(IllegalStateException ex) {
+      expectedEx = ex;
+    }
+    Assert.assertTrue("commit() should have failed",
+      expectedEx != null && expectedEx.getMessage().contains("has been closed()"));
+
+    txnBatch =  connection.fetchTransactionBatch(2, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("name2,2,Welcome to streaming".getBytes());
+    txnBatch.write("name4,2,more Streaming unlimited".getBytes());
+    txnBatch.write("name5,2,even more Streaming unlimited".getBytes());
+    txnBatch.commit();
+
+    //test toString()
+    String s = txnBatch.toString();
+    Assert.assertTrue("Actual: " + s, s.contains("LastUsed " + JavaUtils.txnIdToString(txnBatch.getCurrentTxnId())));
+    Assert.assertTrue("Actual: " + s, s.contains("TxnStatus[CO]"));
+
+    expectedEx = null;
+    txnBatch.beginNextTransaction();
+    writer.enableErrors();
+    try {
+      txnBatch.write("name6,2,Doh!".getBytes());
+    }
+    catch(StreamingIOFailure ex) {
+      expectedEx = ex;
+      txnBatch.getCurrentTransactionState();
+      txnBatch.getCurrentTxnId();//test it doesn't throw ArrayIndexOutOfBounds...
+    }
+    Assert.assertTrue("Wrong exception: " + (expectedEx != null ? expectedEx.getMessage() : "?"),
+      expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred"));
+    expectedEx = null;
+    try {
+      txnBatch.commit();
+    }
+    catch(IllegalStateException ex) {
+      expectedEx = ex;
+    }
+    Assert.assertTrue("commit() should have failed",
+      expectedEx != null && expectedEx.getMessage().contains("has been closed()"));
+
+    //test toString()
+    s = txnBatch.toString();
+    Assert.assertTrue("Actual: " + s, s.contains("LastUsed " + JavaUtils.txnIdToString(txnBatch.getCurrentTxnId())));
+    Assert.assertTrue("Actual: " + s, s.contains("TxnStatus[CA]"));
+
+    r = msClient.showTxns();
+    Assert.assertEquals("HWM didn't match", 19, r.getTxn_high_water_mark());
+    ti = r.getOpen_txns();
+    Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState());
+    Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState());
+    //txnid 3 was committed and thus not open
+    Assert.assertEquals("wrong status ti(2)", TxnState.ABORTED, ti.get(2).getState());
+
+    writer.disableErrors();
+    txnBatch =  connection.fetchTransactionBatch(2, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("name2,2,Welcome to streaming".getBytes());
+    writer.enableErrors();
+    expectedEx = null;
+    try {
+      txnBatch.commit();
+    }
+    catch(StreamingIOFailure ex) {
+      expectedEx = ex;
+    }
+    Assert.assertTrue("Wrong exception: " + (expectedEx != null ? expectedEx.getMessage() : "?"),
+      expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred"));
+
+    r = msClient.showTxns();
+    Assert.assertEquals("HWM didn't match", 21, r.getTxn_high_water_mark());
+    ti = r.getOpen_txns();
+    Assert.assertEquals("wrong status ti(3)", TxnState.ABORTED, ti.get(3).getState());
+    Assert.assertEquals("wrong status ti(4)", TxnState.ABORTED, ti.get(4).getState());
+
+    txnBatch.abort();
+  }
+
+    // assumes un partitioned table
+  // returns a map<bucketNum, list<record> >
+  private HashMap<Integer, ArrayList<SampleRec>> dumpAllBuckets(String dbLocation, String tableName)
+          throws IOException {
+    HashMap<Integer, ArrayList<SampleRec>> result = new HashMap<Integer, ArrayList<SampleRec>>();
+
+    for (File deltaDir : new File(dbLocation + "/" + tableName).listFiles()) {
+      if(!deltaDir.getName().startsWith("delta")) {
+        continue;
+      }
+      File[] bucketFiles = deltaDir.listFiles(new FileFilter() {
+        @Override
+        public boolean accept(File pathname) {
+          String name = pathname.getName();
+          return !name.startsWith("_") && !name.startsWith(".");
+        }
+      });
+      for (File bucketFile : bucketFiles) {
+        if(bucketFile.toString().endsWith("length")) {
+          continue;
+        }
+        Integer bucketNum = getBucketNumber(bucketFile);
+        ArrayList<SampleRec>  recs = dumpBucket(new Path(bucketFile.toString()));
+        result.put(bucketNum, recs);
+      }
+    }
+    return result;
+  }
+
+  //assumes bucket_NNNNN format of file name
+  private Integer getBucketNumber(File bucketFile) {
+    String fname = bucketFile.getName();
+    int start = fname.indexOf('_');
+    String number = fname.substring(start+1, fname.length());
+    return Integer.parseInt(number);
+  }
+
+  // delete db and all tables in it
+  public static void dropDB(IMetaStoreClient client, String databaseName) {
+    try {
+      for (String table : client.listTableNamesByFilter(databaseName, "", (short)-1)) {
+        client.dropTable(databaseName, table, true, true);
+      }
+      client.dropDatabase(databaseName);
+    } catch (TException e) {
+    }
+
+  }
+
+
+
+  ///////// -------- UTILS ------- /////////
+  // returns Path of the partition created (if any) else Path of table
+  public static Path createDbAndTable(IDriver driver, String databaseName,
+                                      String tableName, List<String> partVals,
+                                      String[] colNames, String[] colTypes,
+                                      String[] bucketCols,
+                                      String[] partNames, String dbLocation, int bucketCount)
+          throws Exception {
+
+    String dbUri = "raw://" + new Path(dbLocation).toUri().toString();
+    String tableLoc = dbUri + Path.SEPARATOR + tableName;
+
+    runDDL(driver, "create database IF NOT EXISTS " + databaseName + " location '" + dbUri + "'");
+    runDDL(driver, "use " + databaseName);
+    String crtTbl = "create table " + tableName +
+            " ( " +  getTableColumnsStr(colNames,colTypes) + " )" +
+            getPartitionStmtStr(partNames) +
+            " clustered by ( " + join(bucketCols, ",") + " )" +
+            " into " + bucketCount + " buckets " +
+            " stored as orc " +
+            " location '" + tableLoc +  "'" +
+            " TBLPROPERTIES ('transactional'='true') ";
+    runDDL(driver, crtTbl);
+    if(partNames!=null && partNames.length!=0) {
+      return addPartition(driver, tableName, partVals, partNames);
+    }
+    return new Path(tableLoc);
+  }
+
+  private static Path addPartition(IDriver driver, String tableName, List<String> partVals, String[] partNames)
+      throws Exception {
+    String partSpec = getPartsSpec(partNames, partVals);
+    String addPart = "alter table " + tableName + " add partition ( " + partSpec  + " )";
+    runDDL(driver, addPart);
+    return getPartitionPath(driver, tableName, partSpec);
+  }
+
+  private static Path getPartitionPath(IDriver driver, String tableName, String partSpec) throws Exception {
+    ArrayList<String> res = queryTable(driver, "describe extended " + tableName + " PARTITION (" + partSpec + ")");
+    String partInfo = res.get(res.size() - 1);
+    int start = partInfo.indexOf("location:") + "location:".length();
+    int end = partInfo.indexOf(",",start);
+    return new Path( partInfo.substring(start,end) );
+  }
+
+  private static String getTableColumnsStr(String[] colNames, String[] colTypes) {
+    StringBuilder sb = new StringBuilder();
+    for (int i=0; i < colNames.length; ++i) {
+      sb.append(colNames[i] + " " + colTypes[i]);
+      if (i<colNames.length-1) {
+        sb.append(",");
+      }
+    }
+    return sb.toString();
+  }
+
+  // converts partNames into "partName1 string, partName2 string"
+  private static String getTablePartsStr(String[] partNames) {
+    if (partNames==null || partNames.length==0) {
+      return "";
+    }
+    StringBuilder sb = new StringBuilder();
+    for (int i=0; i < partNames.length; ++i) {
+      sb.append(partNames[i] + " string");
+      if (i < partNames.length-1) {
+        sb.append(",");
+      }
+    }
+    return sb.toString();
+  }
+
+  // converts partNames,partVals into "partName1=val1, partName2=val2"
+  private static String getPartsSpec(String[] partNames, List<String> partVals) {
+    StringBuilder sb = new StringBuilder();
+    for (int i=0; i < partVals.size(); ++i) {
+      sb.append(partNames[i] + " = '" + partVals.get(i) + "'");
+      if(i < partVals.size()-1) {
+        sb.append(",");
+      }
+    }
+    return sb.toString();
+  }
+
+  private static String join(String[] values, String delimiter) {
+    if(values==null) {
+      return null;
+    }
+    StringBuilder strbuf = new StringBuilder();
+
+    boolean first = true;
+
+    for (Object value : values)  {
+      if (!first) { strbuf.append(delimiter); } else { first = false; }
+      strbuf.append(value.toString());
+    }
+
+    return strbuf.toString();
+  }
+  private static String getPartitionStmtStr(String[] partNames) {
+    if ( partNames == null || partNames.length == 0) {
+      return "";
+    }
+    return " partitioned by (" + getTablePartsStr(partNames) + " )";
+  }
+
+  private static boolean runDDL(IDriver driver, String sql) throws QueryFailedException {
+    LOG.debug(sql);
+    System.out.println(sql);
+    //LOG.debug("Running Hive Query: "+ sql);
+    CommandProcessorResponse cpr = driver.run(sql);
+    if (cpr.getResponseCode() == 0) {
+      return true;
+    }
+    LOG.error("Statement: " + sql + " failed: " + cpr);
+    return false;
+  }
+
+
+  public static ArrayList<String> queryTable(IDriver driver, String query) throws IOException {
+    CommandProcessorResponse cpr = driver.run(query);
+    if(cpr.getResponseCode() != 0) {
+      throw new RuntimeException(query + " failed: " + cpr);
+    }
+    ArrayList<String> res = new ArrayList<String>();
+    driver.getResults(res);
+    return res;
+  }
+
+  private static class SampleRec {
+    public String field1;
+    public int field2;
+    public String field3;
+
+    public SampleRec(String field1, int field2, String field3) {
+      this.field1 = field1;
+      this.field2 = field2;
+      this.field3 = field3;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      SampleRec that = (SampleRec) o;
+
+      if (field2 != that.field2) {
+        return false;
+      }
+      if (field1 != null ? !field1.equals(that.field1) : that.field1 != null) {
+        return false;
+      }
+      return !(field3 != null ? !field3.equals(that.field3) : that.field3 != null);
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = field1 != null ? field1.hashCode() : 0;
+      result = 31 * result + field2;
+      result = 31 * result + (field3 != null ? field3.hashCode() : 0);
+      return result;
+    }
+
+    @Override
+    public String toString() {
+      return " { " +
+              "'" + field1 + '\'' +
+              "," + field2 +
+              ",'" + field3 + '\'' +
+              " }";
+    }
+  }
+  /**
+   * This is test-only wrapper around the real RecordWriter.
+   * It can simulate faults from lower levels to test error handling logic.
+   */
+  private static final class FaultyWriter implements RecordWriter {
+    private final RecordWriter delegate;
+    private boolean shouldThrow = false;
+
+    private FaultyWriter(RecordWriter delegate) {
+      assert delegate != null;
+      this.delegate = delegate;
+    }
+    @Override
+    public void write(long writeId, byte[] record) throws StreamingException {
+      delegate.write(writeId, record);
+      produceFault();
+    }
+    @Override
+    public void flush() throws StreamingException {
+      delegate.flush();
+      produceFault();
+    }
+    @Override
+    public void clear() throws StreamingException {
+      delegate.clear();
+    }
+    @Override
+    public void newBatch(Long minTxnId, Long maxTxnID) throws StreamingException {
+      delegate.newBatch(minTxnId, maxTxnID);
+    }
+    @Override
+    public void closeBatch() throws StreamingException {
+      delegate.closeBatch();
+    }
+
+    /**
+     * allows testing of "unexpected" errors
+     * @throws StreamingIOFailure
+     */
+    private void produceFault() throws StreamingIOFailure {
+      if(shouldThrow) {
+        throw new StreamingIOFailure("Simulated fault occurred");
+      }
+    }
+    void enableErrors() {
+      shouldThrow = true;
+    }
+    void disableErrors() {
+      shouldThrow = false;
+    }
+  }
+}