HIVE-15691 Create StrictRegexWriter to work with RegexSerializer for Flume Hive Sink...
authorEugene Koifman <ekoifman@hortonworks.com>
Wed, 22 Mar 2017 20:22:08 +0000 (13:22 -0700)
committerEugene Koifman <ekoifman@hortonworks.com>
Wed, 22 Mar 2017 20:22:08 +0000 (13:22 -0700)
hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java [new file with mode: 0644]
hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java

diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictRegexWriter.java
new file mode 100644 (file)
index 0000000..78987ab
--- /dev/null
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.hcatalog.streaming;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.RegexSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Streaming Writer handles text input data with regex. Uses
+ * org.apache.hadoop.hive.serde2.RegexSerDe
+ */
+public class StrictRegexWriter extends AbstractRecordWriter {
+  private RegexSerDe serde;
+  private final StructObjectInspector recordObjInspector;
+  private final ObjectInspector[] bucketObjInspectors;
+  private final StructField[] bucketStructFields;
+  
+  /**
+   * @param endPoint the end point to write to
+   * @param conn connection this Writer is to be used with
+   * @throws ConnectionError
+   * @throws SerializationError
+   * @throws StreamingException
+   */
+  public StrictRegexWriter(HiveEndPoint endPoint, StreamingConnection conn)
+          throws ConnectionError, SerializationError, StreamingException {
+    this(null, endPoint, null, conn);
+  }
+  
+  /**
+   * @param endPoint the end point to write to
+   * @param conf a Hive conf object. Should be null if not using advanced Hive settings.
+   * @param conn connection this Writer is to be used with
+   * @throws ConnectionError
+   * @throws SerializationError
+   * @throws StreamingException
+   */
+  public StrictRegexWriter(HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
+          throws ConnectionError, SerializationError, StreamingException {
+    this(null, endPoint, conf, conn);
+  }
+  
+  /**
+   * @param regex to parse the data
+   * @param endPoint the end point to write to
+   * @param conf a Hive conf object. Should be null if not using advanced Hive settings.
+   * @param conn connection this Writer is to be used with
+   * @throws ConnectionError
+   * @throws SerializationError
+   * @throws StreamingException
+   */
+  public StrictRegexWriter(String regex, HiveEndPoint endPoint, HiveConf conf, StreamingConnection conn)
+          throws ConnectionError, SerializationError, StreamingException {
+    super(endPoint, conf, conn);
+    this.serde = createSerde(tbl, conf, regex);
+    // get ObjInspectors for entire record and bucketed cols
+    try {
+      recordObjInspector = ( StructObjectInspector ) serde.getObjectInspector();
+      this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector);
+    } catch (SerDeException e) {
+      throw new SerializationError("Unable to get ObjectInspector for bucket columns", e);
+    }
+
+    // get StructFields for bucketed cols
+    bucketStructFields = new StructField[bucketIds.size()];
+    List<? extends StructField> allFields = recordObjInspector.getAllStructFieldRefs();
+    for (int i = 0; i < bucketIds.size(); i++) {
+      bucketStructFields[i] = allFields.get(bucketIds.get(i));
+    }
+  }
+  
+  @Override
+  public AbstractSerDe getSerde() {
+    return serde;
+  }
+
+  @Override
+  protected StructObjectInspector getRecordObjectInspector() {
+    return recordObjInspector;
+  }
+
+  @Override
+  protected StructField[] getBucketStructFields() {
+    return bucketStructFields;
+  }
+
+  @Override
+  protected ObjectInspector[] getBucketObjectInspectors() {
+    return bucketObjInspectors;
+  }
+
+
+  @Override
+  public void write(long transactionId, byte[] record)
+          throws StreamingIOFailure, SerializationError {
+    try {
+      Object encodedRow = encode(record);
+      int bucket = getBucket(encodedRow);
+      getRecordUpdater(bucket).insert(transactionId, encodedRow);
+    } catch (IOException e) {
+      throw new StreamingIOFailure("Error writing record in transaction("
+              + transactionId + ")", e);
+    }
+  }
+
+  /**
+   * Creates RegexSerDe
+   * @param tbl   used to create serde
+   * @param conf  used to create serde
+   * @param regex  used to create serde
+   * @return
+   * @throws SerializationError if serde could not be initialized
+   */
+  private static RegexSerDe createSerde(Table tbl, HiveConf conf, String regex)
+          throws SerializationError {
+    try {
+      Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
+      tableProps.setProperty(RegexSerDe.INPUT_REGEX, regex);
+      ArrayList<String> tableColumns = getCols(tbl);
+      tableProps.setProperty(serdeConstants.LIST_COLUMNS, StringUtils.join(tableColumns, ","));
+      RegexSerDe serde = new RegexSerDe();
+      SerDeUtils.initializeSerDe(serde, conf, tableProps, null);
+      return serde;
+    } catch (SerDeException e) {
+      throw new SerializationError("Error initializing serde " + RegexSerDe.class.getName(), e);
+    }
+  }
+  
+  private static ArrayList<String> getCols(Table table) {
+    List<FieldSchema> cols = table.getSd().getCols();
+    ArrayList<String> colNames = new ArrayList<String>(cols.size());
+    for (FieldSchema col : cols) {
+      colNames.add(col.getName().toLowerCase());
+    }
+    return colNames;
+  }
+
+  /**
+   * Encode Utf8 encoded string bytes using RegexSerDe
+   * 
+   * @param utf8StrRecord
+   * @return The encoded object
+   * @throws SerializationError
+   */
+  @Override
+  public Object encode(byte[] utf8StrRecord) throws SerializationError {
+    try {
+      Text blob = new Text(utf8StrRecord);
+      return serde.deserialize(blob);
+    } catch (SerDeException e) {
+      throw new SerializationError("Unable to convert byte[] record into Object", e);
+    }
+  }
+
+}
index bf29993..8ea58e6 100644 (file)
@@ -64,10 +64,6 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.IOConstants;
-import org.apache.hadoop.hive.shims.Utils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.orc.impl.OrcAcidUtils;
-import org.apache.orc.tools.FileDump;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
@@ -82,11 +78,15 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector;
+import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.orc.impl.OrcAcidUtils;
+import org.apache.orc.tools.FileDump;
 import org.apache.thrift.TException;
 import org.junit.After;
 import org.junit.Assert;
@@ -485,9 +485,9 @@ public class TestStreaming {
 
     NullWritable key = rr.createKey();
     OrcStruct value = rr.createValue();
-    for (int i = 0; i < records.length; i++) {
+    for (String record : records) {
       Assert.assertEquals(true, rr.next(key, value));
-      Assert.assertEquals(records[i], value.toString());
+      Assert.assertEquals(record, value.toString());
     }
     Assert.assertEquals(false, rr.next(key, value));
   }
@@ -787,6 +787,75 @@ public class TestStreaming {
   }
 
   @Test
+  public void testTransactionBatchCommit_Regex() throws Exception {
+    testTransactionBatchCommit_Regex(null);
+  }
+  @Test
+  public void testTransactionBatchCommit_RegexUGI() throws Exception {
+    testTransactionBatchCommit_Regex(Utils.getUGI());
+  }
+  private void testTransactionBatchCommit_Regex(UserGroupInformation ugi) throws Exception {
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
+      partitionVals);
+    StreamingConnection connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName());
+    String regex = "([^,]*),(.*)";
+    StrictRegexWriter writer = new StrictRegexWriter(regex, endPt, conf, connection);
+
+    // 1st Txn
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN
+      , txnBatch.getCurrentTransactionState());
+    txnBatch.write("1,Hello streaming".getBytes());
+    txnBatch.commit();
+
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+      , txnBatch.getCurrentTransactionState());
+
+    // 2nd Txn
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN
+      , txnBatch.getCurrentTransactionState());
+    txnBatch.write("2,Welcome to streaming".getBytes());
+
+    // data should not be visible
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+
+    txnBatch.commit();
+
+    checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+      "{2, Welcome to streaming}");
+
+    txnBatch.close();
+    Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
+      , txnBatch.getCurrentTransactionState());
+
+
+    connection.close();
+
+
+    // To Unpartitioned table
+    endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+    connection = endPt.newConnection(true, conf, ugi, "UT_" + Thread.currentThread().getName());
+    regex = "([^:]*):(.*)";
+    writer = new StrictRegexWriter(regex, endPt, conf, connection);
+
+    // 1st Txn
+    txnBatch =  connection.fetchTransactionBatch(10, writer);
+    txnBatch.beginNextTransaction();
+    Assert.assertEquals(TransactionBatch.TxnState.OPEN
+      , txnBatch.getCurrentTransactionState());
+    txnBatch.write("1:Hello streaming".getBytes());
+    txnBatch.commit();
+
+    Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
+      , txnBatch.getCurrentTransactionState());
+    connection.close();
+  }
+  
+  @Test
   public void testTransactionBatchCommit_Json() throws Exception {
     HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName,
             partitionVals);