HIVE-19965: Make HiveEndPoint use IMetaStoreClient.add_partition (Eugene Koifman...
authorEugene Koifman <ekoifman@apache.org>
Sun, 24 Jun 2018 22:27:12 +0000 (15:27 -0700)
committerEugene Koifman <ekoifman@apache.org>
Sun, 24 Jun 2018 22:27:12 +0000 (15:27 -0700)
hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java

index 3ee19dd..3604630 100644 (file)
 package org.apache.hive.hcatalog.streaming;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.cli.CliSessionState;
@@ -55,6 +61,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Information about the hive end point (i.e. table or partition) to write to.
@@ -294,42 +301,42 @@ public class HiveEndPoint {
     private final String agentInfo;
 
     /**
-     * @param endPoint end point to connect to
-     * @param ugi on behalf of whom streaming is done. cannot be null
-     * @param conf HiveConf object
+     * @param endPoint   end point to connect to
+     * @param ugi        on behalf of whom streaming is done. cannot be null
+     * @param conf       HiveConf object
      * @param createPart create the partition if it does not exist
-     * @throws ConnectionError if there is trouble connecting
-     * @throws InvalidPartition if specified partition does not exist (and createPart=false)
-     * @throws InvalidTable if specified table does not exist
+     * @throws ConnectionError         if there is trouble connecting
+     * @throws InvalidPartition        if specified partition does not exist (and createPart=false)
+     * @throws InvalidTable            if specified table does not exist
      * @throws PartitionCreationFailed if createPart=true and not able to create partition
      */
     private ConnectionImpl(HiveEndPoint endPoint, UserGroupInformation ugi,
-                           HiveConf conf, boolean createPart, String agentInfo)
-            throws ConnectionError, InvalidPartition, InvalidTable
-                   , PartitionCreationFailed {
+        HiveConf conf, boolean createPart, String agentInfo)
+        throws ConnectionError, InvalidPartition, InvalidTable
+        , PartitionCreationFailed {
       this.endPt = endPoint;
       this.ugi = ugi;
       this.agentInfo = agentInfo;
-      this.username = ugi==null ? System.getProperty("user.name") : ugi.getShortUserName();
-      if (conf==null) {
+      this.username = ugi == null ? System.getProperty("user.name") : ugi.getShortUserName();
+      if (conf == null) {
         conf = HiveEndPoint.createHiveConf(this.getClass(), endPoint.metaStoreUri);
+      } else {
+        overrideConfSettings(conf);
       }
-      else {
-          overrideConfSettings(conf);
-      }
-      this.secureMode = ugi==null ? false : ugi.hasKerberosCredentials();
+      this.secureMode = ugi == null ? false : ugi.hasKerberosCredentials();
       this.msClient = getMetaStoreClient(endPoint, conf, secureMode);
       // We use a separate metastore client for heartbeat calls to ensure heartbeat RPC calls are
       // isolated from the other transaction related RPC calls.
       this.heartbeaterMSClient = getMetaStoreClient(endPoint, conf, secureMode);
       checkEndPoint(endPoint, msClient);
-      if (createPart  &&  !endPoint.partitionVals.isEmpty()) {
+      if (createPart && !endPoint.partitionVals.isEmpty()) {
         createPartitionIfNotExists(endPoint, msClient, conf);
       }
     }
 
     /**
      * Checks the validity of endpoint
+     *
      * @param endPoint the HiveEndPoint to be checked
      * @param msClient the metastore client
      * @throws InvalidTable
@@ -372,13 +379,13 @@ public class HiveEndPoint {
      */
     @Override
     public void close() {
-      if (ugi==null) {
+      if (ugi == null) {
         msClient.close();
         heartbeaterMSClient.close();
         return;
       }
       try {
-        ugi.doAs (
+        ugi.doAs(
             new PrivilegedExceptionAction<Void>() {
               @Override
               public Void run() throws Exception {
@@ -386,7 +393,7 @@ public class HiveEndPoint {
                 heartbeaterMSClient.close();
                 return null;
               }
-            } );
+            });
         try {
           FileSystem.closeAllForUGI(ugi);
         } catch (IOException exception) {
@@ -408,91 +415,74 @@ public class HiveEndPoint {
      * Acquires a new batch of transactions from Hive.
      *
      * @param numTransactions is a hint from client indicating how many transactions client needs.
-     * @param recordWriter  Used to write record. The same writer instance can
-     *                      be shared with another TransactionBatch (to the same endpoint)
-     *                      only after the first TransactionBatch has been closed.
-     *                      Writer will be closed when the TransactionBatch is closed.
+     * @param recordWriter    Used to write record. The same writer instance can
+     *                        be shared with another TransactionBatch (to the same endpoint)
+     *                        only after the first TransactionBatch has been closed.
+     *                        Writer will be closed when the TransactionBatch is closed.
      * @return
-     * @throws StreamingIOFailure if failed to create new RecordUpdater for batch
+     * @throws StreamingIOFailure          if failed to create new RecordUpdater for batch
      * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch
-     * @throws ImpersonationFailed failed to run command as proxyUser
+     * @throws ImpersonationFailed         failed to run command as proxyUser
      * @throws InterruptedException
      */
     @Override
     public TransactionBatch fetchTransactionBatch(final int numTransactions,
-                                                      final RecordWriter recordWriter)
-            throws StreamingException, TransactionBatchUnAvailable, ImpersonationFailed
-                  , InterruptedException {
-      if (ugi==null) {
+        final RecordWriter recordWriter)
+        throws StreamingException, TransactionBatchUnAvailable, ImpersonationFailed
+        , InterruptedException {
+      if (ugi == null) {
         return fetchTransactionBatchImpl(numTransactions, recordWriter);
       }
       try {
-        return ugi.doAs (
-                new PrivilegedExceptionAction<TransactionBatch>() {
-                  @Override
-                  public TransactionBatch run() throws StreamingException, InterruptedException {
-                    return fetchTransactionBatchImpl(numTransactions, recordWriter);
-                  }
-                }
+        return ugi.doAs(
+            new PrivilegedExceptionAction<TransactionBatch>() {
+              @Override
+              public TransactionBatch run() throws StreamingException, InterruptedException {
+                return fetchTransactionBatchImpl(numTransactions, recordWriter);
+              }
+            }
         );
       } catch (IOException e) {
         throw new ImpersonationFailed("Failed to fetch Txn Batch as user '" + ugi.getShortUserName()
-                + "' when acquiring Transaction Batch on endPoint " + endPt, e);
+            + "' when acquiring Transaction Batch on endPoint " + endPt, e);
       }
     }
 
     private TransactionBatch fetchTransactionBatchImpl(int numTransactions,
-                                                  RecordWriter recordWriter)
-            throws StreamingException, TransactionBatchUnAvailable, InterruptedException {
+        RecordWriter recordWriter)
+        throws StreamingException, TransactionBatchUnAvailable, InterruptedException {
       return new TransactionBatchImpl(username, ugi, endPt, numTransactions, msClient,
           heartbeaterMSClient, recordWriter, agentInfo);
     }
 
-
     private static void createPartitionIfNotExists(HiveEndPoint ep,
-                                                   IMetaStoreClient msClient, HiveConf conf)
-            throws InvalidTable, PartitionCreationFailed {
+        IMetaStoreClient msClient, HiveConf conf) throws PartitionCreationFailed {
       if (ep.partitionVals.isEmpty()) {
         return;
       }
-      SessionState localSession = null;
-      if(SessionState.get() == null) {
-        localSession = SessionState.start(new CliSessionState(conf));
-      }
-      IDriver driver = DriverFactory.newDriver(conf);
 
       try {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Attempting to create partition (if not existent) " + ep);
-        }
-
-        List<FieldSchema> partKeys = msClient.getTable(ep.database, ep.table)
-                .getPartitionKeys();
-        runDDL(driver, "use " + ep.database);
-        String query = "alter table " + ep.table + " add if not exists partition "
-                + partSpecStr(partKeys, ep.partitionVals);
-        runDDL(driver, query);
-      } catch (MetaException e) {
-        LOG.error("Failed to create partition : " + ep, e);
-        throw new PartitionCreationFailed(ep, e);
-      } catch (NoSuchObjectException e) {
-        LOG.error("Failed to create partition : " + ep, e);
-        throw new InvalidTable(ep.database, ep.table);
-      } catch (TException e) {
-        LOG.error("Failed to create partition : " + ep, e);
-        throw new PartitionCreationFailed(ep, e);
-      } catch (QueryFailedException e) {
+        org.apache.hadoop.hive.ql.metadata.Table tableObject =
+            new org.apache.hadoop.hive.ql.metadata.Table(msClient.getTable(ep.database, ep.table));
+        Map<String, String> partSpec =
+            Warehouse.makeSpecFromValues(tableObject.getPartitionKeys(), ep.partitionVals);
+
+        AddPartitionDesc addPartitionDesc = new AddPartitionDesc(ep.database, ep.table, true);
+        String partLocation = new Path(tableObject.getDataLocation(),
+            Warehouse.makePartPath(partSpec)).toString();
+        addPartitionDesc.addPartition(partSpec, partLocation);
+        Partition partition = Hive.convertAddSpecToMetaPartition(tableObject,
+            addPartitionDesc.getPartition(0), conf);
+        msClient.add_partition(partition);
+      }
+      catch (AlreadyExistsException e) {
+        //ignore this - multiple clients may be trying to create the same partition
+        //AddPartitionDesc has ifExists flag but it's not propagated to
+        // HMSHnalder.add_partitions_core() and so it throws...
+      }
+      catch(HiveException|TException e) {
         LOG.error("Failed to create partition : " + ep, e);
         throw new PartitionCreationFailed(ep, e);
-      } finally {
-        driver.close();
-        try {
-          if(localSession != null) {
-            localSession.close();
-          }
-        } catch (IOException e) {
-          LOG.warn("Error closing SessionState used to run Hive DDL.");
-        }
       }
     }
 
index 5e5bc83..137323c 100644 (file)
@@ -1355,6 +1355,18 @@ public class TestStreaming {
     }
   }
 
+  /**
+   * Make sure that creating an already existing partion is handled gracefully
+   * @throws Exception
+   */
+  @Test
+  public void testCreatePartition() throws Exception {
+    final HiveEndPoint ep = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals);
+    StreamingConnection conn = ep.newConnection(true);
+    conn.close();
+    conn = ep.newConnection(true);
+    conn.close();
+  }
   @Test
   public void testConcurrentTransactionBatchCommits() throws Exception {
     final HiveEndPoint ep = new HiveEndPoint(metaStoreURI, dbName, tblName, partitionVals);