HIVE-19260 - Streaming Ingest API doesn't normalize db.table names (Eugene Koifman...
authorEugene Koifman <ekoifman@apache.org>
Tue, 24 Apr 2018 18:31:28 +0000 (11:31 -0700)
committerEugene Koifman <ekoifman@apache.org>
Tue, 24 Apr 2018 18:31:28 +0000 (11:31 -0700)
hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
streaming/src/test/org/apache/hive/streaming/TestStreaming.java

index 6d248ea..8582e9a 100644 (file)
@@ -88,13 +88,13 @@ public class HiveEndPoint {
     if (database==null) {
       throw new IllegalArgumentException("Database cannot be null for HiveEndPoint");
     }
-    this.database = database;
-    this.table = table;
+    this.database = database.toLowerCase();
     if (table==null) {
       throw new IllegalArgumentException("Table cannot be null for HiveEndPoint");
     }
     this.partitionVals = partitionVals==null ? new ArrayList<String>()
                                              : new ArrayList<String>( partitionVals );
+    this.table = table.toLowerCase();
   }
 
 
index 3733e3d..fe2b1c1 100644 (file)
@@ -67,6 +67,8 @@ import org.apache.hadoop.hive.metastore.api.TxnState;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.DriverFactory;
 import org.apache.hadoop.hive.ql.IDriver;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -353,10 +355,10 @@ public class TestStreaming {
     //todo: why does it need transactional_properties?
     queryTable(driver, "create table default.streamingnobuckets (a string, b string) stored as orc TBLPROPERTIES('transactional'='true', 'transactional_properties'='default')");
     queryTable(driver, "insert into default.streamingnobuckets values('foo','bar')");
-    List<String> rs = queryTable(driver, "select * from default.streamingnobuckets");
+    List<String> rs = queryTable(driver, "select * from default.streamingNoBuckets");
     Assert.assertEquals(1, rs.size());
     Assert.assertEquals("foo\tbar", rs.get(0));
-    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "default", "streamingnobuckets", null);
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "Default", "StreamingNoBuckets", null);
     String[] colNames1 = new String[] { "a", "b" };
     StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
     DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",",  endPt, connection);
@@ -365,6 +367,11 @@ public class TestStreaming {
     txnBatch.beginNextTransaction();
     txnBatch.write("a1,b2".getBytes());
     txnBatch.write("a3,b4".getBytes());
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+    ShowLocksResponse resp = txnHandler.showLocks(new ShowLocksRequest());
+    Assert.assertEquals(resp.getLocksSize(), 1);
+    Assert.assertEquals("streamingnobuckets", resp.getLocks().get(0).getTablename());
+    Assert.assertEquals("default", resp.getLocks().get(0).getDbname());
     txnBatch.commit();
     txnBatch.beginNextTransaction();
     txnBatch.write("a5,b6".getBytes());
index 4d51bbc..fe6d2d6 100644 (file)
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.slf4j.Logger;
@@ -43,8 +44,11 @@ import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -125,7 +129,9 @@ public class Cleaner extends CompactorThread {
         }
         if (toClean.size() > 0 || compactId2LockMap.size() > 0) {
           ShowLocksResponse locksResponse = txnHandler.showLocks(new ShowLocksRequest());
-
+          if(LOG.isDebugEnabled()) {
+            dumpLockState(locksResponse);
+          }
           for (CompactionInfo ci : toClean) {
             // Check to see if we have seen this request before.  If so, ignore it.  If not,
             // add it to our queue.
@@ -163,6 +169,9 @@ public class Cleaner extends CompactorThread {
                 for (Long lockId : expiredLocks) {
                   queueEntry.getValue().remove(lockId);
                 }
+                LOG.info("Skipping cleaning of " +
+                    idWatermark(compactId2CompactInfoMap.get(queueEntry.getKey())) +
+                    " due to reader present: " + queueEntry.getValue());
               }
             }
           } finally {
@@ -203,9 +212,18 @@ public class Cleaner extends CompactorThread {
   private Set<Long> findRelatedLocks(CompactionInfo ci, ShowLocksResponse locksResponse) {
     Set<Long> relatedLocks = new HashSet<Long>();
     for (ShowLocksResponseElement lock : locksResponse.getLocks()) {
-      if (ci.dbname.equals(lock.getDbname())) {
+      /**
+       * Hive QL is not case sensitive wrt db/table/column names
+       * Partition names get
+       * normalized (as far as I can tell) by lower casing column name but not partition value.
+       * {@link org.apache.hadoop.hive.metastore.Warehouse#makePartName(List, List, String)}
+       * {@link org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer#getPartSpec(ASTNode)}
+       * Since user input may start out in any case, compare here case-insensitive for db/table
+       * but leave partition name as is.
+       */
+      if (ci.dbname.equalsIgnoreCase(lock.getDbname())) {
         if ((ci.tableName == null && lock.getTablename() == null) ||
-            (ci.tableName != null && ci.tableName.equals(lock.getTablename()))) {
+            (ci.tableName != null && ci.tableName.equalsIgnoreCase(lock.getTablename()))) {
           if ((ci.partName == null && lock.getPartname() == null) ||
               (ci.partName != null && ci.partName.equals(lock.getPartname()))) {
             relatedLocks.add(lock.getLockid());
@@ -226,12 +244,13 @@ public class Cleaner extends CompactorThread {
   }
 
   private void clean(CompactionInfo ci) throws MetaException {
-    LOG.info("Starting cleaning for " + ci.getFullPartitionName());
+    LOG.info("Starting cleaning for " + ci);
     try {
       Table t = resolveTable(ci);
       if (t == null) {
         // The table was dropped before we got around to cleaning it.
-        LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped");
+        LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped." +
+            idWatermark(ci));
         txnHandler.markCleaned(ci);
         return;
       }
@@ -241,7 +260,7 @@ public class Cleaner extends CompactorThread {
         if (p == null) {
           // The partition was dropped before we got around to cleaning it.
           LOG.info("Unable to find partition " + ci.getFullPartitionName() +
-              ", assuming it was dropped");
+              ", assuming it was dropped." + idWatermark(ci));
           txnHandler.markCleaned(ci);
           return;
         }
@@ -271,7 +290,7 @@ public class Cleaner extends CompactorThread {
           : new ValidReaderWriteIdList();
 
       if (runJobAsSelf(ci.runAs)) {
-        removeFiles(location, validWriteIdList);
+        removeFiles(location, validWriteIdList, ci);
       } else {
         LOG.info("Cleaning as user " + ci.runAs + " for " + ci.getFullPartitionName());
         UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs,
@@ -279,7 +298,7 @@ public class Cleaner extends CompactorThread {
         ugi.doAs(new PrivilegedExceptionAction<Object>() {
           @Override
           public Object run() throws Exception {
-            removeFiles(location, validWriteIdList);
+            removeFiles(location, validWriteIdList, ci);
             return null;
           }
         });
@@ -287,7 +306,7 @@ public class Cleaner extends CompactorThread {
           FileSystem.closeAllForUGI(ugi);
         } catch (IOException exception) {
           LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " +
-              ci.getFullPartitionName(), exception);
+              ci.getFullPartitionName() + idWatermark(ci), exception);
         }
       }
       txnHandler.markCleaned(ci);
@@ -297,20 +316,35 @@ public class Cleaner extends CompactorThread {
       txnHandler.markFailed(ci);
     }
   }
-
-  private void removeFiles(String location, ValidWriteIdList writeIdList) throws IOException {
-    AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(location), conf, writeIdList);
+  private static String idWatermark(CompactionInfo ci) {
+    return " id=" + ci.id;
+  }
+  private void removeFiles(String location, ValidWriteIdList writeIdList, CompactionInfo ci)
+      throws IOException {
+    Path locPath = new Path(location);
+    AcidUtils.Directory dir = AcidUtils.getAcidState(locPath, conf, writeIdList);
     List<FileStatus> obsoleteDirs = dir.getObsolete();
     List<Path> filesToDelete = new ArrayList<Path>(obsoleteDirs.size());
+    StringBuilder extraDebugInfo = new StringBuilder("[");
     for (FileStatus stat : obsoleteDirs) {
       filesToDelete.add(stat.getPath());
+      extraDebugInfo.append(stat.getPath().getName()).append(",");
+      if(!FileUtils.isPathWithinSubtree(stat.getPath(), locPath)) {
+        LOG.info(idWatermark(ci) + " found unexpected file: " + stat.getPath());
+      }
     }
+    extraDebugInfo.setCharAt(extraDebugInfo.length() - 1, ']');
+    List<Long> compactIds = new ArrayList<>(compactId2CompactInfoMap.keySet());
+    Collections.sort(compactIds);
+    extraDebugInfo.append("compactId2CompactInfoMap.keySet(").append(compactIds).append(")");
+    LOG.info(idWatermark(ci) + " About to remove " + filesToDelete.size() +
+         " obsolete directories from " + location + ". " + extraDebugInfo.toString());
     if (filesToDelete.size() < 1) {
       LOG.warn("Hmm, nothing to delete in the cleaner for directory " + location +
           ", that hardly seems right.");
       return;
     }
-    LOG.info("About to remove " + filesToDelete.size() + " obsolete directories from " + location);
+
     FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
 
     for (Path dead : filesToDelete) {
@@ -319,4 +353,63 @@ public class Cleaner extends CompactorThread {
       fs.delete(dead, true);
     }
   }
+  private static class LockComparator implements Comparator<ShowLocksResponseElement> {
+    //sort ascending by resource, nulls first
+    @Override
+    public int compare(ShowLocksResponseElement o1, ShowLocksResponseElement o2) {
+      if(o1 == o2) {
+        return 0;
+      }
+      if(o1 == null) {
+        return -1;
+      }
+      if(o2 == null) {
+        return 1;
+      }
+      int v = o1.getDbname().compareToIgnoreCase(o2.getDbname());
+      if(v != 0) {
+        return v;
+      }
+      if(o1.getTablename() == null) {
+        return -1;
+      }
+      if(o2.getTablename() == null) {
+        return 1;
+      }
+      v = o1.getTablename().compareToIgnoreCase(o2.getTablename());
+      if(v != 0) {
+        return v;
+      }
+      if(o1.getPartname() == null) {
+        return -1;
+      }
+      if(o2.getPartname() == null) {
+        return 1;
+      }
+      v = o1.getPartname().compareToIgnoreCase(o2.getPartname());
+      if(v != 0) {
+        return v;
+      }
+      //if still equal, compare by lock ids
+      v = Long.compare(o1.getLockid(), o2.getLockid());
+      if(v != 0) {
+        return v;
+      }
+      return Long.compare(o1.getLockIdInternal(), o2.getLockIdInternal());
+
+    }
+  }
+  private void dumpLockState(ShowLocksResponse slr) {
+    Iterator<ShowLocksResponseElement> l = slr.getLocksIterator();
+    List<ShowLocksResponseElement> sortedList = new ArrayList<>();
+    while(l.hasNext()) {
+      sortedList.add(l.next());
+    }
+    //sort for readability
+    sortedList.sort(new LockComparator());
+    LOG.info("dumping locks");
+    for(ShowLocksResponseElement lock : sortedList) {
+      LOG.info(lock.toString());
+    }
+  }
 }
index 22765b8..c95daaf 100644 (file)
@@ -248,7 +248,7 @@ public class Initiator extends CompactorThread {
     if (runJobAsSelf(runAs)) {
       return determineCompactionType(ci, writeIds, sd, tblproperties);
     } else {
-      LOG.info("Going to initiate as user " + runAs);
+      LOG.info("Going to initiate as user " + runAs + " for " + ci.getFullPartitionName());
       UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs,
         UserGroupInformation.getLoginUser());
       CompactionType compactionType = ugi.doAs(new PrivilegedExceptionAction<CompactionType>() {
index 39a0f31..db596a6 100644 (file)
@@ -1656,9 +1656,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
             if(!updateTxnComponents) {
               continue;
             }
-            String dbName = lc.getDbname();
-            String tblName = lc.getTablename();
-            String partName = lc.getPartitionname();
+            String dbName = normalizeCase(lc.getDbname());
+            String tblName = normalizeCase(lc.getTablename());
+            String partName = normalizeCase(lc.getPartitionname());
             Long writeId = null;
             if (tblName != null) {
               // It is assumed the caller have already allocated write id for adding/updating data to
@@ -1666,8 +1666,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
               // may return empty result sets.
               // Get the write id allocated by this txn for the given table writes
               s = "select t2w_writeid from TXN_TO_WRITE_ID where"
-                      + " t2w_database = " + quoteString(dbName.toLowerCase())
-                      + " and t2w_table = " + quoteString(tblName.toLowerCase())
+                      + " t2w_database = " + quoteString(dbName)
+                      + " and t2w_table = " + quoteString(tblName)
                       + " and t2w_txnid = " + txnid;
               LOG.debug("Going to execute query <" + s + ">");
               rs = stmt.executeQuery(s);
@@ -1704,9 +1704,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
               + lc + " agentInfo=" + rqst.getAgentInfo());
           }
           intLockId++;
-          String dbName = lc.getDbname();
-          String tblName = lc.getTablename();
-          String partName = lc.getPartitionname();
+          String dbName = normalizeCase(lc.getDbname());
+          String tblName = normalizeCase(lc.getTablename());
+          String partName = normalizeCase(lc.getPartitionname());
           LockType lockType = lc.getType();
           char lockChar = 'z';
           switch (lockType) {
@@ -1764,6 +1764,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       return enqueueLockWithRetry(rqst);
     }
   }
+  private static String normalizeCase(String s) {
+    return s == null ? null : s.toLowerCase();
+  }
   private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long txnId)
     throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException {
     try {
@@ -2385,7 +2388,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         Long writeId = rqst.getWriteid();
         List<String> rows = new ArrayList<>();
         for (String partName : rqst.getPartitionnames()) {
-          rows.add(rqst.getTxnid() + "," + quoteString(rqst.getDbname()) + "," + quoteString(rqst.getTablename()) +
+          rows.add(rqst.getTxnid() + "," + quoteString(normalizeCase(rqst.getDbname()))
+              + "," + quoteString(normalizeCase(rqst.getTablename())) +
               "," + quoteString(partName) + "," + quoteChar(ot.sqlConst) + "," + writeId);
         }
         int modCount = 0;
index e5dd3b3..3343d10 100644 (file)
@@ -69,6 +69,8 @@ import org.apache.hadoop.hive.metastore.api.TxnState;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.DriverFactory;
 import org.apache.hadoop.hive.ql.IDriver;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -356,7 +358,7 @@ public class TestStreaming {
     List<String> rs = queryTable(driver, "select * from default.streamingnobuckets");
     Assert.assertEquals(1, rs.size());
     Assert.assertEquals("foo\tbar", rs.get(0));
-    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "default", "streamingnobuckets", null);
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "Default", "streamingNoBuckets", null);
     String[] colNames1 = new String[] { "a", "b" };
     StreamingConnection connection = endPt.newConnection(false, "UT_" + Thread.currentThread().getName());
     DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",",  endPt, connection);
@@ -365,6 +367,11 @@ public class TestStreaming {
     txnBatch.beginNextTransaction();
     txnBatch.write("a1,b2".getBytes());
     txnBatch.write("a3,b4".getBytes());
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+    ShowLocksResponse resp = txnHandler.showLocks(new ShowLocksRequest());
+    Assert.assertEquals(resp.getLocksSize(), 1);
+    Assert.assertEquals("streamingnobuckets", resp.getLocks().get(0).getTablename());
+    Assert.assertEquals("default", resp.getLocks().get(0).getDbname());
     txnBatch.commit();
     txnBatch.beginNextTransaction();
     txnBatch.write("a5,b6".getBytes());