HIVE-21063 : Support statistics in cachedStore for transactional table. (Mahesh Kumar...
authorMahesh Kumar Behera <mahesh@apache.org>
Fri, 8 Feb 2019 06:21:00 +0000 (11:51 +0530)
committerMahesh Kumar Behera <mahesh@apache.org>
Fri, 8 Feb 2019 06:21:00 +0000 (11:51 +0530)
25 files changed:
hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestCachedStoreUpdateUsingEvents.java
ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdatePartitionColumnStatEvent.java
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdateTableColumnStatEvent.java
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdatePartitionColumnStatMessage.java
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdateTableColumnStatMessage.java
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdatePartitionColumnStatMessage.java
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdateTableColumnStatMessage.java
standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java

index 8404e3e..963b227 100644 (file)
@@ -759,7 +759,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
             .buildUpdateTableColumnStatMessage(updateTableColumnStatEvent.getColStats(),
                     updateTableColumnStatEvent.getTableObj(),
                     updateTableColumnStatEvent.getTableParameters(),
-                    updateTableColumnStatEvent.getValidWriteIds(), updateTableColumnStatEvent.getWriteId());
+                    updateTableColumnStatEvent.getWriteId());
     NotificationEvent event = new NotificationEvent(0, now(), EventType.UPDATE_TABLE_COLUMN_STAT.toString(),
                     msgEncoder.getSerializer().serialize(msg));
     ColumnStatisticsDesc statDesc = updateTableColumnStatEvent.getColStats().getStatsDesc();
@@ -789,7 +789,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
                     updatePartColStatEvent.getPartVals(),
                     updatePartColStatEvent.getPartParameters(),
                     updatePartColStatEvent.getTableObj(),
-                    updatePartColStatEvent.getValidWriteIds(), updatePartColStatEvent.getWriteId());
+                    updatePartColStatEvent.getWriteId());
     NotificationEvent event = new NotificationEvent(0, now(), EventType.UPDATE_PARTITION_COLUMN_STAT.toString(),
                     msgEncoder.getSerializer().serialize(msg));
     ColumnStatisticsDesc statDesc = updatePartColStatEvent.getPartColStats().getStatsDesc();
index 83f12a5..cdfc60c 100644 (file)
@@ -4,6 +4,8 @@ import java.util.*;
 
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.TableName;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.*;
 import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
@@ -12,7 +14,10 @@ import org.apache.hadoop.hive.metastore.client.builder.DatabaseBuilder;
 import org.apache.hadoop.hive.metastore.client.builder.TableBuilder;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
 import org.apache.hadoop.hive.metastore.utils.FileUtils;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hive.hcatalog.listener.DbNotificationListener;
 import org.junit.Assert;
@@ -29,6 +34,7 @@ public class TestCachedStoreUpdateUsingEvents {
   private SharedCache sharedCache;
   private Configuration conf;
   private HiveMetaStore.HMSHandler hmsHandler;
+  private String[] colType = new String[] {"double", "string"};
 
   @Before
   public void setUp() throws Exception {
@@ -39,11 +45,14 @@ public class TestCachedStoreUpdateUsingEvents {
     MetastoreConf.setVar(conf, ConfVars.TRANSACTIONAL_EVENT_LISTENERS, DbNotificationListener.class.getName());
     MetastoreConf.setVar(conf, ConfVars.RAW_STORE_IMPL, "org.apache.hadoop.hive.metastore.cache.CachedStore");
     MetastoreConf.setBoolVar(conf, ConfVars.METASTORE_CACHE_CAN_USE_EVENT, true);
+    MetastoreConf.setBoolVar(conf, ConfVars.HIVE_TXN_STATS_ENABLED, true);
+    MetastoreConf.setBoolVar(conf, ConfVars.AGGREGATE_STATS_CACHE_ENABLED, false);
     MetaStoreTestUtils.setConfForStandloneMode(conf);
 
     hmsHandler = new HiveMetaStore.HMSHandler("testCachedStore", conf, true);
 
-    rawStore = hmsHandler.getMS();
+    rawStore = new ObjectStore();
+    rawStore.setConf(hmsHandler.getConf());
     sharedCache = CachedStore.getSharedCache();
 
     // Stop the CachedStore cache update service. We'll start it explicitly to control the test
@@ -69,10 +78,14 @@ public class TestCachedStoreUpdateUsingEvents {
     String serdeLocation = "file:/tmp";
     Map<String, String> serdeParams = new HashMap<>();
     SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", new HashMap<>());
-    StorageDescriptor sd = new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0,
+    StorageDescriptor sd = new StorageDescriptor(cols, serdeLocation,
+            null, null, false, 3,
             serdeInfo, null, null, serdeParams);
+    sd.setInputFormat(OrcInputFormat.class.getName());
+    sd.setOutputFormat(OrcOutputFormat.class.getName());
     sd.setStoredAsSubDirectories(false);
-    Table tbl = new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, ptnCols, tblParams, null, null,
+    Table tbl = new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, ptnCols, tblParams,
+            null, null,
             TableType.MANAGED_TABLE.toString());
     tbl.setCatName(DEFAULT_CATALOG_NAME);
     return tbl;
@@ -186,6 +199,10 @@ public class TestCachedStoreUpdateUsingEvents {
     long lastEventId = -1;
     RawStore rawStore = hmsHandler.getMS();
 
+    // Prewarm CachedStore
+    CachedStore.setCachePrewarmedState(false);
+    CachedStore.prewarm(rawStore);
+
     // Add a db via rawStore
     String dbName = "test_table_ops";
     String dbOwner = "user1";
@@ -193,10 +210,6 @@ public class TestCachedStoreUpdateUsingEvents {
     hmsHandler.create_database(db);
     db = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName);
 
-    // Prewarm CachedStore
-    CachedStore.setCachePrewarmedState(false);
-    CachedStore.prewarm(rawStore);
-
     // Add a table via rawStore
     String tblName = "tbl";
     String tblOwner = "user1";
@@ -263,6 +276,10 @@ public class TestCachedStoreUpdateUsingEvents {
     long lastEventId = -1;
     RawStore rawStore = hmsHandler.getMS();
 
+    // Prewarm CachedStore
+    CachedStore.setCachePrewarmedState(false);
+    CachedStore.prewarm(rawStore);
+
     // Add a db via rawStore
     String dbName = "test_partition_ops";
     String dbOwner = "user1";
@@ -285,21 +302,19 @@ public class TestCachedStoreUpdateUsingEvents {
     hmsHandler.create_table(tbl);
     tbl = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName);
 
-    // Prewarm CachedStore
-    CachedStore.setCachePrewarmedState(false);
-    CachedStore.prewarm(rawStore);
-
     final String ptnColVal1 = "aaa";
     Map<String, String> partParams = new HashMap<String, String>();
     Partition ptn1 =
-            new Partition(Arrays.asList(ptnColVal1), dbName, tblName, 0, 0, tbl.getSd(), partParams);
+            new Partition(Arrays.asList(ptnColVal1), dbName, tblName, 0,
+                    0, tbl.getSd(), partParams);
     ptn1.setCatName(DEFAULT_CATALOG_NAME);
     hmsHandler.add_partition(ptn1);
     ptn1 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1));
 
     final String ptnColVal2 = "bbb";
     Partition ptn2 =
-            new Partition(Arrays.asList(ptnColVal2), dbName, tblName, 0, 0, tbl.getSd(), partParams);
+            new Partition(Arrays.asList(ptnColVal2), dbName, tblName, 0,
+                    0, tbl.getSd(), partParams);
     ptn2.setCatName(DEFAULT_CATALOG_NAME);
     hmsHandler.add_partition(ptn2);
     ptn2 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal2));
@@ -307,17 +322,21 @@ public class TestCachedStoreUpdateUsingEvents {
     // Read database, table, partition via CachedStore
     Database dbRead = sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME.toLowerCase(), dbName.toLowerCase());
     Assert.assertEquals(db, dbRead);
-    Table tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME.toLowerCase(), dbName.toLowerCase(), tblName.toLowerCase());
+    Table tblRead = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME.toLowerCase(),
+            dbName.toLowerCase(), tblName.toLowerCase());
     compareTables(tbl, tblRead);
-    Partition ptn1Read = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME.toLowerCase(), dbName.toLowerCase(), tblName.toLowerCase(), Arrays.asList(ptnColVal1));
+    Partition ptn1Read = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME.toLowerCase(),
+            dbName.toLowerCase(), tblName.toLowerCase(), Arrays.asList(ptnColVal1));
     comparePartitions(ptn1, ptn1Read);
-    Partition ptn2Read = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME.toLowerCase(), dbName.toLowerCase(), tblName.toLowerCase(), Arrays.asList(ptnColVal2));
+    Partition ptn2Read = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME.toLowerCase(),
+            dbName.toLowerCase(), tblName.toLowerCase(), Arrays.asList(ptnColVal2));
     comparePartitions(ptn2, ptn2Read);
 
     // Add a new partition via rawStore
     final String ptnColVal3 = "ccc";
     Partition ptn3 =
-            new Partition(Arrays.asList(ptnColVal3), dbName, tblName, 0, 0, tbl.getSd(), partParams);
+            new Partition(Arrays.asList(ptnColVal3), dbName, tblName, 0,
+                    0, tbl.getSd(), partParams);
     ptn3.setCatName(DEFAULT_CATALOG_NAME);
     hmsHandler.add_partition(ptn3);
     ptn3 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal3));
@@ -326,7 +345,8 @@ public class TestCachedStoreUpdateUsingEvents {
     ptn1 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1));
     final String ptnColVal1Alt = "aaa";
     Partition ptn1Atl =
-            new Partition(Arrays.asList(ptnColVal1Alt), dbName, tblName, 0, 0, tbl.getSd(), partParams);
+            new Partition(Arrays.asList(ptnColVal1Alt), dbName, tblName, 0,
+                    0, tbl.getSd(), partParams);
     ptn1Atl.setCatName(DEFAULT_CATALOG_NAME);
     hmsHandler.alter_partitions(dbName, tblName, Arrays.asList(ptn1Atl));
     ptn1Atl = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1Alt));
@@ -336,7 +356,8 @@ public class TestCachedStoreUpdateUsingEvents {
     hmsHandler.drop_partition(dbName, tblName, Arrays.asList(ptnColVal2), false);
 
     // Read the newly added partition via CachedStore
-    Partition ptnRead = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal3));
+    Partition ptnRead = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName,
+            tblName, Arrays.asList(ptnColVal3));
     comparePartitions(ptn3, ptnRead);
 
     // Read the altered partition via CachedStore
@@ -362,49 +383,116 @@ public class TestCachedStoreUpdateUsingEvents {
     sharedCache.getSdCache().clear();
   }
 
-  @Test
-  public void testTableColumnStatistics() throws Throwable {
-    String dbName = "column_stats_test_db";
-    String tblName = "tbl";
-    String typeName = "person";
-    String tblOwner = "testowner";
-    int lastAccessed = 6796;
-    String dbOwner = "user1";
+  private void updateTableColStats(String dbName, String tblName, String[] colName,
+                                   double highValue, double avgColLen, boolean isTxnTable) throws Throwable {
+    long writeId = -1;
+    String validWriteIds = null;
+    if (isTxnTable) {
+      writeId = allocateWriteIds(allocateTxns(1), dbName, tblName).get(0).getWriteId();
+      validWriteIds = getValidWriteIds(dbName, tblName);
+    }
 
-    // Add a db via rawStore
-    Database db = createTestDb(dbName, dbOwner);
-    hmsHandler.create_database(db);
-    db = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName);
+    ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc();
+    statsDesc.setDbName(dbName);
+    statsDesc.setTableName(tblName);
+    statsDesc.setIsTblLevel(true);
+    statsDesc.setPartName(null);
 
-    Map<String, String> tableParams =  new HashMap<>();
-    tableParams.put("test_param_1", "hi");
-    tableParams.put("test_param_2", "50");
+    ColumnStatistics colStats = new ColumnStatistics();
+    colStats.setStatsDesc(statsDesc);
+    colStats.setStatsObj(getStatsObjects(dbName, tblName, colName, highValue, avgColLen));
 
-    // Add a table via rawStore
-    List<FieldSchema> cols = new ArrayList<FieldSchema>();
-    cols.add(new FieldSchema("income", "int", "integer column"));
-    cols.add(new FieldSchema("name", "string", "string column"));
+    SetPartitionsStatsRequest setTblColStat = new SetPartitionsStatsRequest(Collections.singletonList(colStats));
+    setTblColStat.setWriteId(writeId);
+    setTblColStat.setValidWriteIdList(validWriteIds);
 
-    List<FieldSchema> ptnCols = new ArrayList<FieldSchema>();
-    ptnCols.add(new FieldSchema("ds", "string", "string partition column"));
-    ptnCols.add(new FieldSchema("hr", "int", "integer partition column"));
+    // write stats objs persistently
+    hmsHandler.update_table_column_statistics_req(setTblColStat);
+    validateTablePara(dbName, tblName);
+
+    ColumnStatistics colStatsCache = sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME,
+            dbName, tblName, Lists.newArrayList(colName[0]), validWriteIds, true);
+    Assert.assertEquals(colStatsCache.getStatsObj().get(0).getColName(), colName[0]);
+    verifyStatDouble(colStatsCache.getStatsObj().get(0), colName[0], highValue);
+
+    colStatsCache = sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME,
+            dbName, tblName, Lists.newArrayList(colName[1]), validWriteIds, true);
+    Assert.assertEquals(colStatsCache.getStatsObj().get(0).getColName(), colName[1]);
+    verifyStatString(colStatsCache.getStatsObj().get(0), colName[1], avgColLen);
+  }
 
-    Table tbl = createTestTblParam(dbName, tblName, tblOwner, cols, null, tableParams);
-    hmsHandler.create_table(tbl);
+  private void updatePartColStats(String dbName, String tblName, boolean isTxnTable, String[] colName,
+                                  String partName, double highValue, double avgColLen) throws Throwable {
+    long writeId = -1;
+    String validWriteIds = null;
+    List<Long> txnIds = null;
 
-    // Prewarm CachedStore
-    CachedStore.setCachePrewarmedState(false);
-    CachedStore.prewarm(rawStore);
+    if (isTxnTable) {
+      txnIds = allocateTxns(1);
+      writeId = allocateWriteIds(txnIds, dbName, tblName).get(0).getWriteId();
+      validWriteIds = getValidWriteIds(dbName, tblName);
+    }
 
-    // Create a ColumnStatistics Obj
-    String[] colName = new String[]{"income", "name"};
+    ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc();
+    statsDesc.setDbName(dbName);
+    statsDesc.setTableName(tblName);
+    statsDesc.setIsTblLevel(false);
+    statsDesc.setPartName(partName);
+
+    ColumnStatistics colStats = new ColumnStatistics();
+    colStats.setStatsDesc(statsDesc);
+    colStats.setStatsObj(getStatsObjects(dbName, tblName, colName, highValue, avgColLen));
+
+    SetPartitionsStatsRequest setTblColStat = new SetPartitionsStatsRequest(Collections.singletonList(colStats));
+    setTblColStat.setWriteId(writeId);
+    setTblColStat.setValidWriteIdList(validWriteIds);
+
+    // write stats objs persistently
+    hmsHandler.update_partition_column_statistics_req(setTblColStat);
+
+    if (isTxnTable) {
+      CommitTxnRequest rqst = new CommitTxnRequest(txnIds.get(0));
+      hmsHandler.commit_txn(rqst);
+      writeId = allocateWriteIds(allocateTxns(1), dbName, tblName).get(0).getWriteId();
+      validWriteIds = getValidWriteIds(dbName, tblName);
+    }
+
+    Deadline.startTimer("getPartitionColumnStatistics");
+    List<ColumnStatistics> statRowStore = rawStore.getPartitionColumnStatistics(DEFAULT_CATALOG_NAME, dbName, tblName,
+            Collections.singletonList(partName), Collections.singletonList(colName[1]), validWriteIds);
+    Deadline.stopTimer();
+    verifyStatString(statRowStore.get(0).getStatsObj().get(0), colName[1], avgColLen);
+    if (isTxnTable) {
+      Assert.assertEquals(statRowStore.get(0).isIsStatsCompliant(), true);
+    } else {
+      Assert.assertEquals(statRowStore.get(0).isIsStatsCompliant(), false);
+    }
+
+    List<ColumnStatistics> statSharedCache = sharedCache.getPartitionColStatsListFromCache(DEFAULT_CATALOG_NAME,
+            dbName, tblName, Collections.singletonList(partName), Collections.singletonList(colName[1]),
+            validWriteIds, true);
+    verifyStatString(statSharedCache.get(0).getStatsObj().get(0), colName[1], avgColLen);
+    if (isTxnTable) {
+      Assert.assertEquals(statSharedCache.get(0).isIsStatsCompliant(), true);
+    } else {
+      Assert.assertEquals(statSharedCache.get(0).isIsStatsCompliant(), false);
+    }
+
+    SharedCache.ColumStatsWithWriteId statPartCache = sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME,
+            dbName, tblName, CachedStore.partNameToVals(partName), colName[0], validWriteIds);
+    verifyStatDouble(statPartCache.getColumnStatisticsObj(), colName[0], highValue);
+
+    statPartCache = sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName,
+            CachedStore.partNameToVals(partName), colName[1], validWriteIds);
+    verifyStatString(statPartCache.getColumnStatisticsObj(), colName[1], avgColLen);
+  }
+
+  private List<ColumnStatisticsObj> getStatsObjects(String dbName, String tblName, String[] colName,
+                                                    double highValue, double avgColLen) throws Throwable {
     double lowValue = 50000.21;
-    double highValue = 1200000.4525;
     long numNulls = 3;
     long numDVs = 22;
-    double avgColLen = 50.30;
     long maxColLen = 102;
-    String[] colType = new String[] {"double", "string"};
     boolean isTblLevel = true;
     String partName = null;
     List<ColumnStatisticsObj> statsObjs = new ArrayList<>();
@@ -445,42 +533,96 @@ public class TestCachedStoreUpdateUsingEvents {
 
     statsObj.setStatsData(statsData);
     statsObjs.add(statsObj);
+    return statsObjs;
+  }
 
-    ColumnStatistics colStats = new ColumnStatistics();
-    colStats.setStatsDesc(statsDesc);
-    colStats.setStatsObj(statsObjs);
+  private void verifyStatDouble(ColumnStatisticsObj colStats, String colName, double highValue) {
+    double lowValue = 50000.21;
+    long numNulls = 3;
+    long numDVs = 22;
+    Assert.assertEquals(colStats.getColName(), colName);
+    Assert.assertEquals(colStats.getStatsData().getDoubleStats().getLowValue(), lowValue, 0.01);
+    Assert.assertEquals(colStats.getStatsData().getDoubleStats().getHighValue(), highValue, 0.01);
+    Assert.assertEquals(colStats.getStatsData().getDoubleStats().getNumNulls(), numNulls);
+    Assert.assertEquals(colStats.getStatsData().getDoubleStats().getNumDVs(), numDVs);
+  }
 
-    // write stats objs persistently
-    hmsHandler.update_table_column_statistics(colStats);
-
-    ColumnStatisticsObj colStatsCache = sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME,
-            dbName, tblName, Lists.newArrayList(colName[0])).get(0);
-    Assert.assertEquals(colStatsCache.getColName(), colName[0]);
-    Assert.assertEquals(colStatsCache.getStatsData().getDoubleStats().getLowValue(), lowValue, 0.01);
-    Assert.assertEquals(colStatsCache.getStatsData().getDoubleStats().getHighValue(), highValue, 0.01);
-    Assert.assertEquals(colStatsCache.getStatsData().getDoubleStats().getNumNulls(), numNulls);
-    Assert.assertEquals(colStatsCache.getStatsData().getDoubleStats().getNumDVs(), numDVs);
-
-    // test delete column stats; if no col name is passed all column stats associated with the
-    // table is deleted
-    boolean status = hmsHandler.delete_table_column_statistics(dbName, tblName, null);
-    Assert.assertEquals(status, true);
+  private void verifyStatString(ColumnStatisticsObj colStats, String colName, double avgColLen) {
+    long numNulls = 3;
+    long numDVs = 22;
+    long maxColLen = 102;
+    Assert.assertEquals(colStats.getColName(), colName);
+    Assert.assertEquals(colStats.getStatsData().getStringStats().getMaxColLen(), maxColLen);
+    Assert.assertEquals(colStats.getStatsData().getStringStats().getAvgColLen(), avgColLen, 0.01);
+    Assert.assertEquals(colStats.getStatsData().getStringStats().getNumNulls(), numNulls);
+    Assert.assertEquals(colStats.getStatsData().getStringStats().getNumDVs(), numDVs);
+  }
 
-    Assert.assertEquals(sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME,
-            dbName, tblName, Lists.newArrayList(colName[0])).isEmpty(), true);
+  private void verifyStat(List<ColumnStatisticsObj> colStats, String[] colName, double highValue, double avgColLen) {
+    //verifyStatDouble(colStats.get(0), colName[0], highValue);
+    verifyStatString(colStats.get(0), colName[1], avgColLen);
+  }
 
-    tblName = "tbl_part";
-    cols = new ArrayList<>();
-    cols.add(new FieldSchema(colName[0], "int", null));
+  private void setUpBeforeTest(String dbName, String tblName, String[] colName, boolean isTxnTable) throws Throwable {
+    String dbOwner = "user1";
+
+    // Prewarm CachedStore
+    CachedStore.setCachePrewarmedState(false);
+    CachedStore.prewarm(rawStore);
+
+    // Add a db via rawStore
+    Database db = createTestDb(dbName, dbOwner);
+    hmsHandler.create_database(db);
+    if (tblName != null) {
+      createTestTable(dbName, tblName, colName, isTxnTable);
+    }
+  }
+
+  private void createTestTable(String dbName, String tblName, String[] colName, boolean isTxnTable) throws Throwable {
+    // Add a table via rawStore
+    List<FieldSchema> cols = new ArrayList<FieldSchema>();
+    cols.add(new FieldSchema(colName[0], "int", "integer column"));
+    cols.add(new FieldSchema(colName[1], "string", "string column"));
+
+    Map<String, String> tableParams =  new HashMap<>();
+    tableParams.put("test_param_1", "hi");
+    tableParams.put("test_param_2", "50");
+    if (isTxnTable) {
+      tableParams.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true");
+    }
+
+    String tblOwner = "testowner";
+
+    List<FieldSchema> ptnCols = new ArrayList<FieldSchema>();
+    ptnCols.add(new FieldSchema("ds", "string", "string partition column"));
+    ptnCols.add(new FieldSchema("hr", "int", "integer partition column"));
+
+    Table tbl = createTestTblParam(dbName, tblName, tblOwner, cols, null, tableParams);
+    hmsHandler.create_table(tbl);
+  }
+
+  private void createTableWithPart(String dbName, String tblName, String[] colName, boolean isTxnTbl) throws Throwable {
+    List<FieldSchema> cols = new ArrayList<>();
+    cols.add(new FieldSchema(colName[0], colType[0], null));
     List<FieldSchema> partCols = new ArrayList<>();
-    partCols.add(new FieldSchema("col", "int", null));
+    partCols.add(new FieldSchema(colName[0], colType[0], null));
+    Map<String, String> tableParams =  new HashMap<>();
+    tableParams.put("test_param_1", "hi");
+    tableParams.put("test_param_2", "50");
+    if (isTxnTbl) {
+      tableParams.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true");
+      StatsSetupConst.setBasicStatsState(tableParams, StatsSetupConst.TRUE);
+    }
     StorageDescriptor sd =
-            new StorageDescriptor(cols, null, "input", "output", false,
+            new StorageDescriptor(cols, null, "orc",
+                    "orc", false,
                     0, new SerDeInfo("serde", "seriallib", new HashMap<>()),
-                    null, null, null);
+                    null, null, tableParams);
+    sd.setInputFormat(OrcInputFormat.class.getName());
+    sd.setOutputFormat(OrcOutputFormat.class.getName());
 
-    tbl = new Table(tblName, dbName, null, 0, 0, 0, sd, partCols, new HashMap<>(),
-                    null, null, TableType.MANAGED_TABLE.toString());
+    Table tbl = new Table(tblName, dbName, null, 0, 0, 0, sd,
+            partCols, tableParams, null, null, TableType.MANAGED_TABLE.toString());
     tbl.setCatName(DEFAULT_CATALOG_NAME);
 
     hmsHandler.create_table(tbl);
@@ -489,47 +631,405 @@ public class TestCachedStoreUpdateUsingEvents {
     partVals1.add("1");
     List<String> partVals2 = new ArrayList<>();
     partVals2.add("2");
+    Map<String, String> partParams =  new HashMap<>();
+    StatsSetupConst.setBasicStatsState(partParams, StatsSetupConst.TRUE);
+    EnvironmentContext environmentContext = new EnvironmentContext();
+    environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED, StatsSetupConst.TASK);
 
     Partition ptn1 =
-            new Partition(partVals1, dbName, tblName, 0, 0, sd, new HashMap<>());
+            new Partition(partVals1, dbName, tblName, 0, 0, sd, partParams);
     ptn1.setCatName(DEFAULT_CATALOG_NAME);
-    hmsHandler.add_partition(ptn1);
+    hmsHandler.add_partition_with_environment_context(ptn1, environmentContext);
     Partition ptn2 =
-            new Partition(partVals2, dbName, tblName, 0, 0, sd, new HashMap<>());
+            new Partition(partVals2, dbName, tblName, 0, 0, sd, partParams);
     ptn2.setCatName(DEFAULT_CATALOG_NAME);
-    hmsHandler.add_partition(ptn2);
+    hmsHandler.add_partition_with_environment_context(ptn2, environmentContext);
+  }
+
+  private List<Long> allocateTxns(int numTxns) throws Throwable {
+    OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "user", "host");
+    return hmsHandler.open_txns(openTxnRequest).getTxn_ids();
+  }
+
+  private List<TxnToWriteId> allocateWriteIds(List<Long> txnIds, String dbName, String tblName) throws Throwable {
+    AllocateTableWriteIdsRequest allocateTableWriteIdsRequest = new AllocateTableWriteIdsRequest(dbName, tblName);
+    allocateTableWriteIdsRequest.setTxnIds(txnIds);
+    return hmsHandler.allocate_table_write_ids(allocateTableWriteIdsRequest).getTxnToWriteIds();
+  }
+
+  private String getValidWriteIds(String dbName, String tblName) throws Throwable {
+    GetValidWriteIdsRequest validWriteIdsRequest = new GetValidWriteIdsRequest(
+            Collections.singletonList(TableName.getDbTable(dbName, tblName)));
+    GetValidWriteIdsResponse validWriteIdsResponse = hmsHandler.get_valid_write_ids(validWriteIdsRequest);
+    return TxnCommonUtils.createValidReaderWriteIdList(validWriteIdsResponse.
+            getTblValidWriteIds().get(0)).writeToString();
+  }
+
+  private void validateTablePara(String dbName, String tblName) throws Throwable {
+    Table tblRead = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName);
+    Table tblRead1 = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName);
+    Assert.assertEquals(tblRead.getParameters(), tblRead1.getParameters());
+  }
 
+  private void validatePartPara(String dbName, String tblName, String partName) throws Throwable {
+    //Partition part1 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, partName);
+    //Partition part2 = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, partName);
+    //Assert.assertEquals(part1.getParameters(), part2.getParameters());
+  }
+
+  private void deleteColStats(String dbName, String tblName, String[] colName) throws Throwable {
+    boolean status = hmsHandler.delete_table_column_statistics(dbName, tblName, null);
+    Assert.assertEquals(status, true);
+    Assert.assertEquals(sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName,
+            Lists.newArrayList(colName[0]),  null, true).getStatsObj().isEmpty(), true);
+    Assert.assertEquals(sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName,
+            Lists.newArrayList(colName[1]), null, true).getStatsObj().isEmpty(), true);
+    validateTablePara(dbName, tblName);
+  }
+
+  private void deletePartColStats(String dbName, String tblName, String[] colName,
+                                  String partName) throws Throwable {
+    boolean status = hmsHandler.delete_partition_column_statistics(dbName, tblName, partName, colName[1]);
+    Assert.assertEquals(status, true);
+
+    SharedCache.ColumStatsWithWriteId colStats = sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, dbName,
+            tblName, CachedStore.partNameToVals(partName), colName[1], null);
+    Assert.assertEquals(colStats.getColumnStatisticsObj(), null);
+    validateTablePara(dbName, tblName);
+  }
+
+  private void testTableColStatInternal(String dbName, String tblName, boolean isTxnTable) throws Throwable {
+    String[] colName = new String[]{"income", "name"};
+    double highValue = 1200000.4525;
+    double avgColLen = 50.30;
+
+    setUpBeforeTest(dbName, tblName, colName, isTxnTable);
+    updateTableColStats(dbName, tblName, colName, highValue, avgColLen, isTxnTable);
+    if (!isTxnTable) {
+      deleteColStats(dbName, tblName, colName);
+    }
+
+    tblName = "tbl_part";
+    createTableWithPart(dbName, tblName, colName, isTxnTable);
     List<String> partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1);
-    partName = partitions.get(0);
-    isTblLevel = false;
+    String partName = partitions.get(0);
+    updatePartColStats(dbName, tblName, isTxnTable, colName, partName, highValue, avgColLen);
+    if (!isTxnTable) {
+      deletePartColStats(dbName, tblName, colName, partName);
+    }
+  }
+
+  @Test
+  public void testTableColumnStatistics() throws Throwable {
+    String dbName = "column_stats_test_db";
+    String tblName = "tbl";
+    testTableColStatInternal(dbName, tblName, false);
+  }
+
+  @Test
+  public void testTableColumnStatisticsTxnTable() throws Throwable {
+    String dbName = "column_stats_test_db_txn";
+    String tblName = "tbl_txn";
+    testTableColStatInternal(dbName, tblName, true);
+  }
+
+  @Test
+  public void testTableColumnStatisticsTxnTableMulti() throws Throwable {
+    String dbName = "column_stats_test_db_txn_multi";
+    String tblName = "tbl_part";
+    String[] colName = new String[]{"income", "name"};
+    double highValue = 1200000.4525;
+    double avgColLen = 50.30;
+
+    setUpBeforeTest(dbName, null, colName, true);
+    createTableWithPart(dbName, tblName, colName, true);
+    List<String> partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1);
+    String partName = partitions.get(0);
+    updatePartColStats(dbName, tblName, true, colName, partName, highValue, avgColLen);
+    updatePartColStats(dbName, tblName, true, colName, partName, 1200000.4521, avgColLen);
+    updatePartColStats(dbName, tblName, true, colName, partName, highValue, 34.78);
+  }
+
+  @Test
+  public void testTableColumnStatisticsTxnTableMultiAbort() throws Throwable {
+    String dbName = "column_stats_test_db_txn_multi_abort";
+    String tblName = "tbl_part";
+    String[] colName = new String[]{"income", "name"};
+    double highValue = 1200000.4525;
+    double avgColLen = 50.30;
+
+    setUpBeforeTest(dbName, null, colName, true);
+    createTableWithPart(dbName, tblName, colName, true);
+    List<String> partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1);
+    String partName = partitions.get(0);
+
+    List<Long> txnIds = allocateTxns(1);
+    long writeId = allocateWriteIds(txnIds, dbName, tblName).get(0).getWriteId();
+    String validWriteIds = getValidWriteIds(dbName, tblName);
 
     // create a new columnstatistics desc to represent partition level column stats
-    statsDesc = new ColumnStatisticsDesc();
+    ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc();
     statsDesc.setDbName(dbName);
     statsDesc.setTableName(tblName);
     statsDesc.setPartName(partName);
-    statsDesc.setIsTblLevel(isTblLevel);
+    statsDesc.setIsTblLevel(false);
+
+    ColumnStatistics colStats = new ColumnStatistics();
+    colStats.setStatsDesc(statsDesc);
+    colStats.setStatsObj(getStatsObjects(dbName, tblName, colName, highValue, avgColLen));
+
+    SetPartitionsStatsRequest setTblColStat = new SetPartitionsStatsRequest(Collections.singletonList(colStats));
+    setTblColStat.setWriteId(writeId);
+    setTblColStat.setValidWriteIdList(validWriteIds);
+
+    // write stats objs persistently
+    hmsHandler.update_partition_column_statistics_req(setTblColStat);
+
+    // abort the txn and verify that the stats got is not compliant.
+    AbortTxnRequest rqst = new AbortTxnRequest(txnIds.get(0));
+    hmsHandler.abort_txn(rqst);
+
+    allocateWriteIds(allocateTxns(1), dbName, tblName);
+    validWriteIds = getValidWriteIds(dbName, tblName);
+
+    Deadline.startTimer("getPartitionColumnStatistics");
+    List<ColumnStatistics> statRawStore = rawStore.getPartitionColumnStatistics(DEFAULT_CATALOG_NAME, dbName, tblName,
+            Collections.singletonList(partName), Collections.singletonList(colName[1]), validWriteIds);
+    Deadline.stopTimer();
+
+    verifyStat(statRawStore.get(0).getStatsObj(), colName, highValue, avgColLen);
+    Assert.assertEquals(statRawStore.get(0).isIsStatsCompliant(), false);
+
+    List<ColumnStatistics> statsListFromCache = sharedCache.getPartitionColStatsListFromCache(DEFAULT_CATALOG_NAME,
+            dbName, tblName, Collections.singletonList(partName), Collections.singletonList(colName[1]),
+            validWriteIds, true);
+    verifyStat(statsListFromCache.get(0).getStatsObj(), colName, highValue, avgColLen);
+    Assert.assertEquals(statsListFromCache.get(0).isIsStatsCompliant(), false);
+
+    SharedCache.ColumStatsWithWriteId columStatsWithWriteId =
+            sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName,
+              CachedStore.partNameToVals(partName), colName[1], validWriteIds);
+    Assert.assertEquals(columStatsWithWriteId, null);
+    validatePartPara(dbName, tblName, partName);
+  }
+
+  @Test
+  public void testTableColumnStatisticsTxnTableOpenTxn() throws Throwable {
+    String dbName = "column_stats_test_db_txn_multi_open";
+    String tblName = "tbl_part";
+    String[] colName = new String[]{"income", "name"};
+    double highValue = 1200000.4121;
+    double avgColLen = 23.30;
+
+    setUpBeforeTest(dbName, null, colName, true);
+    createTableWithPart(dbName, tblName, colName, true);
+    List<String> partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1);
+    String partName = partitions.get(0);
+
+    // update part col stats successfully.
+    updatePartColStats(dbName, tblName, true, colName, partName, 1.2, 12.2);
+
+    List<Long> txnIds = allocateTxns(1);
+    long writeId = allocateWriteIds(txnIds, dbName, tblName).get(0).getWriteId();
+    String validWriteIds = getValidWriteIds(dbName, tblName);
+
+    // create a new columnstatistics desc to represent partition level column stats
+    ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc();
+    statsDesc.setDbName(dbName);
+    statsDesc.setTableName(tblName);
+    statsDesc.setPartName(partName);
+    statsDesc.setIsTblLevel(false);
+
+    ColumnStatistics colStats = new ColumnStatistics();
+    colStats.setStatsDesc(statsDesc);
+    colStats.setStatsObj(getStatsObjects(dbName, tblName, colName, highValue, avgColLen));
+
+    SetPartitionsStatsRequest setTblColStat = new SetPartitionsStatsRequest(Collections.singletonList(colStats));
+    setTblColStat.setWriteId(writeId);
+    setTblColStat.setValidWriteIdList(validWriteIds);
+
+    // write stats objs persistently
+    hmsHandler.update_partition_column_statistics_req(setTblColStat);
+
+    // keep the txn open and verify that the stats got is not compliant.
+
+    allocateWriteIds(allocateTxns(1), dbName, tblName);
+    validWriteIds = getValidWriteIds(dbName, tblName);
+
+    Deadline.startTimer("getPartitionColumnStatistics");
+    List<ColumnStatistics> statRawStore = rawStore.getPartitionColumnStatistics(DEFAULT_CATALOG_NAME, dbName, tblName,
+            Collections.singletonList(partName), Collections.singletonList(colName[1]), validWriteIds);
+    Deadline.stopTimer();
+
+    verifyStat(statRawStore.get(0).getStatsObj(), colName, highValue, avgColLen);
+    Assert.assertEquals(statRawStore.get(0).isIsStatsCompliant(), false);
+
+    List<ColumnStatistics> statsListFromCache = sharedCache.getPartitionColStatsListFromCache(DEFAULT_CATALOG_NAME,
+            dbName, tblName, Collections.singletonList(partName), Collections.singletonList(colName[1]),
+            validWriteIds, true);
+    verifyStat(statsListFromCache.get(0).getStatsObj(), colName, highValue, avgColLen);
+    Assert.assertEquals(statsListFromCache.get(0).isIsStatsCompliant(), false);
+
+    SharedCache.ColumStatsWithWriteId columStatsWithWriteId =
+            sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, dbName,
+              tblName, CachedStore.partNameToVals(partName), colName[1], validWriteIds);
+    Assert.assertEquals(columStatsWithWriteId, null);
+    validatePartPara(dbName, tblName, partName);
+  }
+
+  private void verifyAggrStat(String dbName, String tblName, String[] colName, List<String> partitions,
+                              boolean isTxnTbl, double highValue) throws Throwable {
+    List<Long> txnIds = allocateTxns(1);
+    allocateWriteIds(txnIds, dbName, tblName).get(0).getWriteId();
+    String validWriteIds = getValidWriteIds(dbName, tblName);
+
+    Deadline.startTimer("getPartitionSpecsByFilterAndProjection");
+    AggrStats aggrStats = rawStore.get_aggr_stats_for(DEFAULT_CATALOG_NAME, dbName, tblName, partitions,
+            Collections.singletonList(colName[0]), validWriteIds);
+    Deadline.stopTimer();
+    Assert.assertEquals(aggrStats.getPartsFound(), 2);
+    Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getDoubleStats().getHighValue(), highValue, 0.01);
+    //Assert.assertEquals(aggrStats.isIsStatsCompliant(), true);
+
+    // This will update the cache for non txn table.
+    PartitionsStatsRequest request = new PartitionsStatsRequest(dbName, tblName,
+            Collections.singletonList(colName[0]), partitions);
+    request.setCatName(DEFAULT_CATALOG_NAME);
+    request.setValidWriteIdList(validWriteIds);
+    AggrStats aggrStatsCached = hmsHandler.get_aggr_stats_for(request);
+    Assert.assertEquals(aggrStatsCached, aggrStats);
+    //Assert.assertEquals(aggrStatsCached.isIsStatsCompliant(), true);
+
+    List<ColumnStatisticsObj> stats = sharedCache.getAggrStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName,
+            Collections.singletonList(colName[0]), SharedCache.StatsType.ALL);
+    Assert.assertEquals(stats.get(0).getStatsData().getDoubleStats().getHighValue(), highValue, 0.01);
+  }
+
+  @Test
+  public void testAggrStat() throws Throwable {
+    String dbName = "aggr_stats_test";
+    String tblName = "tbl_part";
+    String[] colName = new String[]{"income", "name"};
+
+    setUpBeforeTest(dbName, null, colName, false);
+    createTableWithPart(dbName, tblName, colName, false);
+    List<String> partitions = hmsHandler.get_partition_names(dbName, tblName, (short) -1);
+    String partName = partitions.get(0);
+
+    // update part col stats successfully.
+    updatePartColStats(dbName, tblName, false, colName, partitions.get(0), 2, 12);
+    updatePartColStats(dbName, tblName, false, colName, partitions.get(1), 4, 10);
+    verifyAggrStat(dbName, tblName, colName, partitions, false, 4);
+
+    updatePartColStats(dbName, tblName, false, colName, partitions.get(1), 3, 10);
+    verifyAggrStat(dbName, tblName, colName, partitions, false, 3);
+  }
+
+  @Test
+  public void testAggrStatTxnTable() throws Throwable {
+    String dbName = "aggr_stats_test_db_txn";
+    String tblName = "tbl_part";
+    String[] colName = new String[]{"income", "name"};
 
-    colStats = new ColumnStatistics();
+    setUpBeforeTest(dbName, null, colName, true);
+    createTableWithPart(dbName, tblName, colName, true);
+    List<String> partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1);
+    String partName = partitions.get(0);
+
+    // update part col stats successfully.
+    updatePartColStats(dbName, tblName, true, colName, partitions.get(0), 2, 12);
+    updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 4, 10);
+    verifyAggrStat(dbName, tblName, colName, partitions, true, 4);
+
+    updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 3, 10);
+    verifyAggrStat(dbName, tblName, colName, partitions, true, 3);
+
+    List<Long> txnIds = allocateTxns(1);
+    long writeId = allocateWriteIds(txnIds, dbName, tblName).get(0).getWriteId();
+    String validWriteIds = getValidWriteIds(dbName, tblName);
+
+    // create a new columnstatistics desc to represent partition level column stats
+    ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc();
+    statsDesc.setDbName(dbName);
+    statsDesc.setTableName(tblName);
+    statsDesc.setPartName(partName);
+    statsDesc.setIsTblLevel(false);
+
+    ColumnStatistics colStats = new ColumnStatistics();
+    colStats.setStatsDesc(statsDesc);
+    colStats.setStatsObj(getStatsObjects(dbName, tblName, colName, 5, 20));
+
+    SetPartitionsStatsRequest setTblColStat = new SetPartitionsStatsRequest(Collections.singletonList(colStats));
+    setTblColStat.setWriteId(writeId);
+    setTblColStat.setValidWriteIdList(validWriteIds);
+    hmsHandler.update_partition_column_statistics_req(setTblColStat);
+
+    Deadline.startTimer("getPartitionSpecsByFilterAndProjection");
+    AggrStats aggrStats = rawStore.get_aggr_stats_for(DEFAULT_CATALOG_NAME, dbName, tblName, partitions,
+            Collections.singletonList(colName[0]), validWriteIds);
+    Deadline.stopTimer();
+    Assert.assertEquals(aggrStats, null);
+
+    // keep the txn open and verify that the stats got is not compliant.
+    PartitionsStatsRequest request = new PartitionsStatsRequest(dbName, tblName,
+            Collections.singletonList(colName[0]), partitions);
+    request.setCatName(DEFAULT_CATALOG_NAME);
+    request.setValidWriteIdList(validWriteIds);
+    AggrStats aggrStatsCached = hmsHandler.get_aggr_stats_for(request);
+    Assert.assertEquals(aggrStatsCached, null);
+  }
+
+  @Test
+  public void testAggrStatAbortTxn() throws Throwable {
+    String dbName = "aggr_stats_test_db_txn_abort";
+    String tblName = "tbl_part";
+    String[] colName = new String[]{"income", "name"};
+
+    setUpBeforeTest(dbName, null, colName, true);
+    createTableWithPart(dbName, tblName, colName, true);
+    List<String> partitions = hmsHandler.get_partition_names(dbName, tblName, (short)-1);
+    String partName = partitions.get(0);
+
+    // update part col stats successfully.
+    updatePartColStats(dbName, tblName, true, colName, partitions.get(0), 2, 12);
+    updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 4, 10);
+    verifyAggrStat(dbName, tblName, colName, partitions, true, 4);
+
+    List<Long> txnIds = allocateTxns(4);
+    long writeId = allocateWriteIds(txnIds, dbName, tblName).get(0).getWriteId();
+    String validWriteIds = getValidWriteIds(dbName, tblName);
+
+    // create a new columnstatistics desc to represent partition level column stats
+    ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc();
+    statsDesc.setDbName(dbName);
+    statsDesc.setTableName(tblName);
+    statsDesc.setPartName(partName);
+    statsDesc.setIsTblLevel(false);
+
+    ColumnStatistics colStats = new ColumnStatistics();
     colStats.setStatsDesc(statsDesc);
-    colStats.setStatsObj(statsObjs);
-
-    hmsHandler.update_partition_column_statistics(colStats);
-    ColumnStatisticsObj colStats2 = sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName,
-            CachedStore.partNameToVals(partName), colName[1]);
-    // compare stats obj to ensure what we get is what we wrote
-    Assert.assertEquals(colStats.getStatsDesc().getPartName(), partName);
-    Assert.assertEquals(colStats2.getColName(), colName[1]);
-    Assert.assertEquals(colStats2.getStatsData().getStringStats().getMaxColLen(), maxColLen);
-    Assert.assertEquals(colStats2.getStatsData().getStringStats().getAvgColLen(), avgColLen, 0.01);
-    Assert.assertEquals(colStats2.getStatsData().getStringStats().getNumNulls(), numNulls);
-    Assert.assertEquals(colStats2.getStatsData().getStringStats().getNumDVs(), numDVs);
-
-    // test stats deletion at partition level
-    hmsHandler.delete_partition_column_statistics(dbName, tblName, partName, colName[1]);
-
-    colStats2 = sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName,
-            CachedStore.partNameToVals(partName), colName[1]);
-    Assert.assertEquals(colStats2, null);
+    colStats.setStatsObj(getStatsObjects(dbName, tblName, colName, 5, 20));
+
+    SetPartitionsStatsRequest setTblColStat = new SetPartitionsStatsRequest(Collections.singletonList(colStats));
+    setTblColStat.setWriteId(writeId);
+    setTblColStat.setValidWriteIdList(validWriteIds);
+    hmsHandler.update_partition_column_statistics_req(setTblColStat);
+
+    AbortTxnRequest abortTxnRequest = new AbortTxnRequest(txnIds.get(0));
+    hmsHandler.abort_txn(abortTxnRequest);
+
+    Deadline.startTimer("getPartitionSpecsByFilterAndProjection");
+    AggrStats aggrStats = rawStore.get_aggr_stats_for(DEFAULT_CATALOG_NAME, dbName, tblName, partitions,
+            Collections.singletonList(colName[0]), validWriteIds);
+    Deadline.stopTimer();
+    Assert.assertEquals(aggrStats, null);
+
+    // keep the txn open and verify that the stats got is not compliant.
+    PartitionsStatsRequest request = new PartitionsStatsRequest(dbName, tblName,
+            Collections.singletonList(colName[0]), partitions);
+    request.setCatName(DEFAULT_CATALOG_NAME);
+    request.setValidWriteIdList(validWriteIds);
+    AggrStats aggrStatsCached = hmsHandler.get_aggr_stats_for(request);
+    Assert.assertEquals(aggrStatsCached, null);
   }
 }
index c028e12..7c1944f 100644 (file)
@@ -448,7 +448,7 @@ public class StatsUpdaterThread extends Thread implements MetaStoreThread {
     }
     // TODO: we should probably skip updating if writeId is from an active txn
     boolean isTxnValid = (writeIdString == null) || ObjectStore.isCurrentStatsValidForTheQuery(
-        conf, params, statsWriteId , writeIdString, false);
+        params, statsWriteId, writeIdString, false);
     return getExistingStatsToUpdate(existingStats, params, isTxnValid);
   }
 
@@ -473,7 +473,7 @@ public class StatsUpdaterThread extends Thread implements MetaStoreThread {
     }
     // TODO: we should probably skip updating if writeId is from an active txn
     if (writeIdString != null && !ObjectStore.isCurrentStatsValidForTheQuery(
-        conf, params, statsWriteId, writeIdString, false)) {
+        params, statsWriteId, writeIdString, false)) {
       return allCols;
     }
     List<String> colsToUpdate = new ArrayList<>();
index b43fb5e..4472f99 100644 (file)
@@ -3684,7 +3684,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
             try {
               boolean madeDir = createLocationForAddedPartition(table, partition);
               addedParts.put(new PartValEqWrapperLite(partition), madeDir);
-              initializeAddedPartition(table, partition, madeDir);
+              initializeAddedPartition(table, partition, madeDir, null);
             } catch (MetaException e) {
               throw new IOException(e.getMessage(), e);
             }
@@ -3973,15 +3973,18 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         return true;
     }
 
-    private void initializeAddedPartition(
-        final Table tbl, final Partition part, boolean madeDir) throws MetaException {
-      initializeAddedPartition(tbl, new PartitionSpecProxy.SimplePartitionWrapperIterator(part), madeDir);
+    private void initializeAddedPartition(final Table tbl, final Partition part, boolean madeDir,
+                                          EnvironmentContext environmentContext) throws MetaException {
+      initializeAddedPartition(tbl,
+              new PartitionSpecProxy.SimplePartitionWrapperIterator(part), madeDir, environmentContext);
     }
 
     private void initializeAddedPartition(
-        final Table tbl, final PartitionSpecProxy.PartitionIterator part, boolean madeDir) throws MetaException {
+        final Table tbl, final PartitionSpecProxy.PartitionIterator part, boolean madeDir,
+        EnvironmentContext environmentContext) throws MetaException {
       if (canUpdateStats(tbl)) {
-        MetaStoreServerUtils.updatePartitionStatsFast(part, tbl, wh, madeDir, false, null, true);
+        MetaStoreServerUtils.updatePartitionStatsFast(part, tbl, wh, madeDir,
+                false, environmentContext, true);
       }
 
       // set create time
@@ -4046,7 +4049,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         assert shouldAdd; // start would throw if it already existed here
         boolean madeDir = createLocationForAddedPartition(tbl, part);
         try {
-          initializeAddedPartition(tbl, part, madeDir);
+          initializeAddedPartition(tbl, part, madeDir, envContext);
           initializePartitionParameters(tbl, part);
           success = ms.addPartition(part);
         } finally {
@@ -5989,13 +5992,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           if (transactionalListeners != null && !transactionalListeners.isEmpty()) {
             MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
                     EventType.UPDATE_TABLE_COLUMN_STAT,
-                    new UpdateTableColumnStatEvent(colStats, tableObj, parameters, validWriteIds,
+                    new UpdateTableColumnStatEvent(colStats, tableObj, parameters,
                             writeId, this));
           }
           if (!listeners.isEmpty()) {
             MetaStoreListenerNotifier.notifyEvent(listeners,
                     EventType.UPDATE_TABLE_COLUMN_STAT,
-                    new UpdateTableColumnStatEvent(colStats, tableObj, parameters, validWriteIds,
+                    new UpdateTableColumnStatEvent(colStats, tableObj, parameters,
                             writeId,this));
           }
         }
@@ -6055,13 +6058,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           if (transactionalListeners != null && !transactionalListeners.isEmpty()) {
             MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
               EventType.UPDATE_PARTITION_COLUMN_STAT,
-              new UpdatePartitionColumnStatEvent(colStats, partVals, parameters, tbl, validWriteIds,
+              new UpdatePartitionColumnStatEvent(colStats, partVals, parameters, tbl,
                       writeId, this));
           }
           if (!listeners.isEmpty()) {
             MetaStoreListenerNotifier.notifyEvent(listeners,
               EventType.UPDATE_PARTITION_COLUMN_STAT,
-              new UpdatePartitionColumnStatEvent(colStats, partVals, parameters, tbl, validWriteIds,
+              new UpdatePartitionColumnStatEvent(colStats, partVals, parameters, tbl,
                       writeId, this));
           }
         }
index 0485fe9..c0bae3b 100644 (file)
@@ -8629,7 +8629,7 @@ public class ObjectStore implements RawStore, Configurable {
             // Make sure we set the flag to invalid regardless of the current value.
             StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE);
             LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the partition "
-                + statsDesc.getDbName() + "." + statsDesc.getTableName() + "." + statsDesc.getPartName());
+                    + statsDesc.getDbName() + "." + statsDesc.getTableName() + "." + statsDesc.getPartName());
           }
           mPartition.setWriteId(writeId);
         }
@@ -12479,7 +12479,7 @@ public class ObjectStore implements RawStore, Configurable {
    */
   private boolean isCurrentStatsValidForTheQuery(MTable tbl, String queryValidWriteIdList,
       boolean isCompleteStatsWriter) throws MetaException {
-    return isCurrentStatsValidForTheQuery(conf, tbl.getParameters(), tbl.getWriteId(),
+    return isCurrentStatsValidForTheQuery(tbl.getParameters(), tbl.getWriteId(),
         queryValidWriteIdList, isCompleteStatsWriter);
   }
 
@@ -12499,19 +12499,19 @@ public class ObjectStore implements RawStore, Configurable {
   private boolean isCurrentStatsValidForTheQuery(MPartition part,
       String queryValidWriteIdList, boolean isCompleteStatsWriter)
       throws MetaException {
-    return isCurrentStatsValidForTheQuery(conf, part.getParameters(), part.getWriteId(),
+    return isCurrentStatsValidForTheQuery(part.getParameters(), part.getWriteId(),
         queryValidWriteIdList, isCompleteStatsWriter);
   }
 
   private boolean isCurrentStatsValidForTheQuery(Partition part, long partWriteId,
       String queryValidWriteIdList, boolean isCompleteStatsWriter)
       throws MetaException {
-    return isCurrentStatsValidForTheQuery(conf, part.getParameters(), partWriteId,
+    return isCurrentStatsValidForTheQuery(part.getParameters(), partWriteId,
         queryValidWriteIdList, isCompleteStatsWriter);
   }
 
   // TODO: move to somewhere else
-  public static boolean isCurrentStatsValidForTheQuery(Configuration conf,
+  public static boolean isCurrentStatsValidForTheQuery(
       Map<String, String> statsParams, long statsWriteId, String queryValidWriteIdList,
       boolean isCompleteStatsWriter) throws MetaException {
 
index 7ad4bd2..182d5cc 100644 (file)
@@ -51,6 +51,7 @@ import org.apache.hadoop.hive.metastore.RawStore;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.HiveAlterHandler;
+import org.apache.hadoop.hive.metastore.HiveMetaException;
 import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.hadoop.hive.metastore.cache.SharedCache.StatsType;
 import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregator;
@@ -205,7 +206,7 @@ public class CachedStore implements RawStore, Configurable {
    return sharedCache;
   }
 
-  static private ColumnStatistics updateStatsForPart(RawStore rawStore, Table before, String catalogName,
+  static private ColumnStatistics updateStatsForAlterPart(RawStore rawStore, Table before, String catalogName,
                                           String dbName, String tableName, Partition part) throws Exception {
     ColumnStatistics colStats;
     List<String> deletedCols = new ArrayList<>();
@@ -215,32 +216,31 @@ public class CachedStore implements RawStore, Configurable {
       sharedCache.removePartitionColStatsFromCache(catalogName, dbName, tableName, part.getValues(), column);
     }
     if (colStats != null) {
-      sharedCache.updatePartitionColStatsInCache(catalogName, dbName, tableName, part.getValues(), colStats.getStatsObj());
+      sharedCache.alterPartitionAndStatsInCache(catalogName, dbName, tableName, part.getWriteId(),
+              part.getValues(), part.getParameters(), colStats.getStatsObj());
     }
     return colStats;
   }
 
-  static private void updateStatsForTable(RawStore rawStore, Table before, Table after, String catalogName,
+  static private void updateStatsForAlterTable(RawStore rawStore, Table tblBefore, Table tblAfter, String catalogName,
                                           String dbName, String tableName) throws Exception {
     ColumnStatistics colStats = null;
     List<String> deletedCols = new ArrayList<>();
-    if (before.isSetPartitionKeys()) {
+    if (tblBefore.isSetPartitionKeys()) {
       List<Partition> parts = sharedCache.listCachedPartitions(catalogName, dbName, tableName, -1);
       for (Partition part : parts) {
-        colStats = updateStatsForPart(rawStore, before, catalogName, dbName, tableName, part);
+        colStats = updateStatsForAlterPart(rawStore, tblBefore, catalogName, dbName, tableName, part);
       }
     }
 
-    boolean needUpdateAggrStat = false;
-    List<ColumnStatisticsObj> statisticsObjs = HiveAlterHandler.alterTableUpdateTableColumnStats(rawStore, before,
-            after,null, null, rawStore.getConf(), deletedCols);
+    List<ColumnStatisticsObj> statisticsObjs = HiveAlterHandler.alterTableUpdateTableColumnStats(rawStore, tblBefore,
+            tblAfter,null, null, rawStore.getConf(), deletedCols);
     if (colStats != null) {
-      sharedCache.updateTableColStatsInCache(catalogName, dbName, tableName, statisticsObjs);
-      needUpdateAggrStat = true;
+      sharedCache.alterTableAndStatsInCache(catalogName, dbName, tableName, tblAfter.getWriteId(),
+              statisticsObjs, tblAfter.getParameters());
     }
     for (String column : deletedCols) {
       sharedCache.removeTableColStatsFromCache(catalogName, dbName, tableName, column);
-      needUpdateAggrStat = true;
     }
   }
 
@@ -309,10 +309,8 @@ public class CachedStore implements RawStore, Configurable {
           sharedCache.alterPartitionInCache(catalogName, dbName, tableName,
                   alterPartitionMessage.getPtnObjBefore().getValues(), alterPartitionMessage.getPtnObjAfter());
           //TODO : Use the stat object stored in the alter table message to update the stats in cache.
-          if (updateStatsForPart(rawStore, alterPartitionMessage.getTableObj(),
-                  catalogName, dbName, tableName, alterPartitionMessage.getPtnObjAfter()) != null) {
-            CacheUpdateMasterWork.updateTableAggregatePartitionColStats(rawStore, catalogName, dbName, tableName);
-          }
+          updateStatsForAlterPart(rawStore, alterPartitionMessage.getTableObj(),
+                  catalogName, dbName, tableName, alterPartitionMessage.getPtnObjAfter());
           break;
         case MessageBuilder.DROP_PARTITION_EVENT:
           DropPartitionMessage dropPartitionMessage = deserializer.getDropPartitionMessage(message);
@@ -329,7 +327,7 @@ public class CachedStore implements RawStore, Configurable {
           AlterTableMessage alterTableMessage = deserializer.getAlterTableMessage(message);
           sharedCache.alterTableInCache(catalogName, dbName, tableName, alterTableMessage.getTableObjAfter());
           //TODO : Use the stat object stored in the alter table message to update the stats in cache.
-          updateStatsForTable(rawStore, alterTableMessage.getTableObjBefore(), alterTableMessage.getTableObjAfter(),
+          updateStatsForAlterTable(rawStore, alterTableMessage.getTableObjBefore(), alterTableMessage.getTableObjAfter(),
                   catalogName, dbName, tableName);
           break;
         case MessageBuilder.DROP_TABLE_EVENT:
@@ -371,8 +369,8 @@ public class CachedStore implements RawStore, Configurable {
           break;
         case MessageBuilder.UPDATE_TBL_COL_STAT_EVENT:
           UpdateTableColumnStatMessage msg = deserializer.getUpdateTableColumnStatMessage(message);
-          updateTableColumnsStatsInternal(rawStore.getConf(), msg.getColumnStatistics(), msg.getParameters(),
-                  msg.getValidWriteIds(), msg.getWriteId());
+          sharedCache.alterTableAndStatsInCache(catalogName, dbName, tableName, msg.getWriteId(),
+                  msg.getColumnStatistics().getStatsObj(), msg.getParameters());
           break;
         case MessageBuilder.DELETE_TBL_COL_STAT_EVENT:
           DeleteTableColumnStatMessage msgDel = deserializer.getDeleteTableColumnStatMessage(message);
@@ -380,7 +378,8 @@ public class CachedStore implements RawStore, Configurable {
           break;
         case MessageBuilder.UPDATE_PART_COL_STAT_EVENT:
           UpdatePartitionColumnStatMessage msgPartUpdate = deserializer.getUpdatePartitionColumnStatMessage(message);
-          sharedCache.updatePartitionColStatsInCache(catalogName, dbName, tableName, msgPartUpdate.getPartVals(),
+          sharedCache.alterPartitionAndStatsInCache(catalogName, dbName, tableName, msgPartUpdate.getWriteId(),
+                  msgPartUpdate.getPartVals(), msgPartUpdate.getParameters(),
                   msgPartUpdate.getColumnStatistics().getStatsObj());
           break;
         case MessageBuilder.DELETE_PART_COL_STAT_EVENT:
@@ -909,7 +908,7 @@ public class CachedStore implements RawStore, Configurable {
           sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName),
               StringUtils.normalizeIdentifier(dbName),
               StringUtils.normalizeIdentifier(tblName), aggrStatsAllPartitions,
-              aggrStatsAllButDefaultPartition);
+              aggrStatsAllButDefaultPartition, null);
         }
       } catch (MetaException | NoSuchObjectException e) {
         LOG.info("Updating CachedStore: unable to read aggregate column stats of table: " + tblName,
@@ -948,7 +947,12 @@ public class CachedStore implements RawStore, Configurable {
     // the event related to the current transactions are updated in the cache and thus we can support strong
     // consistency in case there is only one metastore.
     if (canUseEvents) {
-      triggerUpdateUsingEvent(rawStore);
+      try {
+        triggerUpdateUsingEvent(rawStore);
+      } catch (Exception e) {
+        //TODO : Not sure how to handle it as the commit is already done in the object store.
+        LOG.error("Failed to update cache", e);
+      }
     }
     return true;
   }
@@ -2000,8 +2004,7 @@ public class CachedStore implements RawStore, Configurable {
       Map<String, String> params, long statsWriteId, String validWriteIds) throws MetaException {
     if (!TxnUtils.isTransactionalTable(tableParams)) return params; // Not a txn table.
     if (areTxnStatsSupported && ((validWriteIds == null)
-        || ObjectStore.isCurrentStatsValidForTheQuery(
-            conf, params, statsWriteId, validWriteIds, false))) {
+        || ObjectStore.isCurrentStatsValidForTheQuery(params, statsWriteId, validWriteIds, false))) {
       // Valid stats are supported for txn tables, and either no verification was requested by the
       // caller, or the verification has succeeded.
       return params;
@@ -2014,14 +2017,14 @@ public class CachedStore implements RawStore, Configurable {
 
 
   // Note: ideally this should be above both CachedStore and ObjectStore.
-  private ColumnStatistics adjustColStatForGet(Map<String, String> tableParams,
-      Map<String, String> params, ColumnStatistics colStat, long statsWriteId,
-      String validWriteIds) throws MetaException {
+  public static ColumnStatistics adjustColStatForGet(Map<String, String> tableParams,
+                                               ColumnStatistics colStat, long statsWriteId,
+      String validWriteIds, boolean areTxnStatsSupported) throws MetaException {
     colStat.setIsStatsCompliant(true);
     if (!TxnUtils.isTransactionalTable(tableParams)) return colStat; // Not a txn table.
     if (areTxnStatsSupported && ((validWriteIds == null)
         || ObjectStore.isCurrentStatsValidForTheQuery(
-            conf, params, statsWriteId, validWriteIds, false))) {
+            tableParams, statsWriteId, validWriteIds, false))) {
       // Valid stats are supported for txn tables, and either no verification was requested by the
       // caller, or the verification has succeeded.
       return colStat;
@@ -2058,7 +2061,7 @@ public class CachedStore implements RawStore, Configurable {
         if (errorMsg != null) {
           throw new MetaException(errorMsg);
         }
-        if (!ObjectStore.isCurrentStatsValidForTheQuery(conf, newParams, table.getWriteId(),
+        if (!ObjectStore.isCurrentStatsValidForTheQuery(newParams, table.getWriteId(),
                 validWriteIds, true)) {
           // Make sure we set the flag to invalid regardless of the current value.
           StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE);
@@ -2111,11 +2114,14 @@ public class CachedStore implements RawStore, Configurable {
       return rawStore.getTableColumnStatistics(
           catName, dbName, tblName, colNames, validWriteIds);
     }
-    ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tblName);
-    List<ColumnStatisticsObj> colStatObjs =
-        sharedCache.getTableColStatsFromCache(catName, dbName, tblName, colNames);
-    return adjustColStatForGet(table.getParameters(), table.getParameters(),
-        new ColumnStatistics(csd, colStatObjs), table.getWriteId(), validWriteIds);
+    ColumnStatistics columnStatistics =
+        sharedCache.getTableColStatsFromCache(catName, dbName, tblName, colNames, validWriteIds, areTxnStatsSupported);
+    if (columnStatistics == null) {
+      LOG.info("Stat of Table {}.{} for column {} is not present in cache." +
+              "Getting from raw store", dbName, tblName, colNames);
+      return rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames, validWriteIds);
+    }
+    return columnStatistics;
   }
 
   @Override
@@ -2170,10 +2176,17 @@ public class CachedStore implements RawStore, Configurable {
       String catName, String dbName, String tblName, List<String> partNames,
       List<String> colNames, String writeIdList)
       throws MetaException, NoSuchObjectException {
-    // TODO: why have updatePartitionColumnStatistics cache if this is a bypass?
-    // Note: when implemented, this needs to call adjustColStatForGet, like other get methods.
-    return rawStore.getPartitionColumnStatistics(
-        catName, dbName, tblName, partNames, colNames, writeIdList);
+
+    // If writeIdList is not null, that means stats are requested within a txn context. So set stats compliant to false,
+    // if areTxnStatsSupported is false or the write id which has updated the stats in not compatible with writeIdList.
+    // This is done within table lock as the number of partitions may be more than one and we need a consistent view
+    // for all the partitions.
+    List<ColumnStatistics> columnStatistics = sharedCache.getPartitionColStatsListFromCache(catName, dbName, tblName,
+            partNames, colNames, writeIdList, areTxnStatsSupported);
+    if (columnStatistics == null) {
+      return rawStore.getPartitionColumnStatistics(catName, dbName, tblName, partNames, colNames, writeIdList);
+    }
+    return columnStatistics;
   }
 
   @Override
@@ -2212,8 +2225,8 @@ public class CachedStore implements RawStore, Configurable {
     tblName = StringUtils.normalizeIdentifier(tblName);
     // TODO: we currently cannot do transactional checks for stats here
     //       (incl. due to lack of sync w.r.t. the below rawStore call).
-    //TODO : need to calculate aggregate locally in cached store
-    if (!shouldCacheTable(catName, dbName, tblName) || writeIdList != null || canUseEvents) {
+    // In case the cache is updated using events, aggregate is calculated locally and thus can be read from cache.
+    if (!shouldCacheTable(catName, dbName, tblName) || (writeIdList != null && !canUseEvents)) {
       return rawStore.get_aggr_stats_for(
           catName, dbName, tblName, partNames, colNames, writeIdList);
     }
@@ -2225,45 +2238,68 @@ public class CachedStore implements RawStore, Configurable {
     }
 
     List<String> allPartNames = rawStore.listPartitionNames(catName, dbName, tblName, (short) -1);
+    StatsType type = StatsType.PARTIAL;
     if (partNames.size() == allPartNames.size()) {
       colStats = sharedCache.getAggrStatsFromCache(catName, dbName, tblName, colNames, StatsType.ALL);
       if (colStats != null) {
         return new AggrStats(colStats, partNames.size());
       }
+      type = StatsType.ALL;
     } else if (partNames.size() == (allPartNames.size() - 1)) {
       String defaultPartitionName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME);
       if (!partNames.contains(defaultPartitionName)) {
-        colStats =
-            sharedCache.getAggrStatsFromCache(catName, dbName, tblName, colNames, StatsType.ALLBUTDEFAULT);
+        colStats = sharedCache.getAggrStatsFromCache(catName, dbName, tblName, colNames, StatsType.ALLBUTDEFAULT);
         if (colStats != null) {
           return new AggrStats(colStats, partNames.size());
         }
+        type = StatsType.ALLBUTDEFAULT;
       }
     }
+
     LOG.debug("Didn't find aggr stats in cache. Merging them. tblName= {}, parts= {}, cols= {}",
         tblName, partNames, colNames);
-    MergedColumnStatsForPartitions mergedColStats =
-        mergeColStatsForPartitions(catName, dbName, tblName, partNames, colNames, sharedCache);
+    MergedColumnStatsForPartitions mergedColStats = mergeColStatsForPartitions(catName, dbName, tblName,
+              partNames, colNames, sharedCache, type, writeIdList);
+    if (mergedColStats == null) {
+      LOG.info("Aggregate stats of partition " + TableName.getQualified(catName, dbName, tblName) + "." +
+              partNames + " for columns " + colNames + " is not present in cache. Getting it from raw store");
+      return rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames, writeIdList);
+    }
     return new AggrStats(mergedColStats.getColStats(), mergedColStats.getPartsFound());
   }
 
   private MergedColumnStatsForPartitions mergeColStatsForPartitions(
       String catName, String dbName, String tblName, List<String> partNames, List<String> colNames,
-      SharedCache sharedCache) throws MetaException {
+      SharedCache sharedCache, StatsType type, String writeIdList) throws MetaException {
     final boolean useDensityFunctionForNDVEstimation =
         MetastoreConf.getBoolVar(getConf(), ConfVars.STATS_NDV_DENSITY_FUNCTION);
     final double ndvTuner = MetastoreConf.getDoubleVar(getConf(), ConfVars.STATS_NDV_TUNER);
     Map<ColumnStatsAggregator, List<ColStatsObjWithSourceInfo>> colStatsMap = new HashMap<>();
-    boolean areAllPartsFound = true;
-    long partsFound = 0;
+    long partsFound = partNames.size();
+    Map<List<String>, Long> partNameToWriteId = writeIdList != null ? new HashMap<>() : null;
     for (String colName : colNames) {
       long partsFoundForColumn = 0;
       ColumnStatsAggregator colStatsAggregator = null;
       List<ColStatsObjWithSourceInfo> colStatsWithPartInfoList = new ArrayList<>();
       for (String partName : partNames) {
-        ColumnStatisticsObj colStatsForPart =
-            sharedCache.getPartitionColStatsFromCache(catName, dbName, tblName, partNameToVals(partName), colName);
-        if (colStatsForPart != null) {
+        List<String> partValue = partNameToVals(partName);
+        // There are three possible result from getPartitionColStatsFromCache.
+        // 1. The partition has valid stats and thus colStatsWriteId returned is valid non-null value
+        // 2. Partition stat is missing from cache and thus colStatsWriteId returned is non-null but colstat
+        //    info in it is null. In this case we just ignore the partition from aggregate calculation to keep
+        //    the behavior same as object store.
+        // 3. Partition is missing or its stat is updated by live(not yet committed) or aborted txn. In this case,
+        //    colStatsWriteId is null. Thus null is returned to keep the behavior same as object store.
+        SharedCache.ColumStatsWithWriteId colStatsWriteId = sharedCache.getPartitionColStatsFromCache(catName, dbName,
+                tblName, partValue, colName, writeIdList);
+        if (colStatsWriteId == null) {
+          return null;
+        }
+        if (colStatsWriteId.getColumnStatisticsObj() != null) {
+          ColumnStatisticsObj colStatsForPart = colStatsWriteId.getColumnStatisticsObj();
+          if (partNameToWriteId != null) {
+            partNameToWriteId.put(partValue, colStatsWriteId.getWriteId());
+          }
           ColStatsObjWithSourceInfo colStatsWithPartInfo =
               new ColStatsObjWithSourceInfo(colStatsForPart, catName, dbName, tblName, partName);
           colStatsWithPartInfoList.add(colStatsWithPartInfo);
@@ -2282,7 +2318,9 @@ public class CachedStore implements RawStore, Configurable {
       if (colStatsWithPartInfoList.size() > 0) {
         colStatsMap.put(colStatsAggregator, colStatsWithPartInfoList);
       }
-      if (partsFoundForColumn == partNames.size()) {
+      // set partsFound to the min(partsFoundForColumn) for all columns. partsFound is the number of partitions, for
+      // which stats for all columns are present in the cache.
+      if (partsFoundForColumn < partsFound) {
         partsFound = partsFoundForColumn;
       }
       if (colStatsMap.size() < 1) {
@@ -2293,8 +2331,23 @@ public class CachedStore implements RawStore, Configurable {
     }
     // Note that enableBitVector does not apply here because ColumnStatisticsObj
     // itself will tell whether bitvector is null or not and aggr logic can automatically apply.
-    return new MergedColumnStatsForPartitions(MetaStoreServerUtils.aggrPartitionStats(colStatsMap,
-        partNames, areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner), partsFound);
+    List<ColumnStatisticsObj> colAggrStats = MetaStoreServerUtils.aggrPartitionStats(colStatsMap,
+            partNames, partsFound == partNames.size(), useDensityFunctionForNDVEstimation, ndvTuner);
+
+    if (canUseEvents) {
+      if (type == StatsType.ALL) {
+        sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName),
+                StringUtils.normalizeIdentifier(dbName),
+                StringUtils.normalizeIdentifier(tblName), new AggrStats(colAggrStats, partsFound),
+                null, partNameToWriteId);
+      } else if (type == StatsType.ALLBUTDEFAULT) {
+        sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName),
+                StringUtils.normalizeIdentifier(dbName),
+                StringUtils.normalizeIdentifier(tblName), null,
+                new AggrStats(colAggrStats, partsFound), partNameToWriteId);
+      }
+    }
+    return new MergedColumnStatsForPartitions(colAggrStats, partsFound);
   }
 
   class MergedColumnStatsForPartitions {
index ce9e383..1c23022 100644 (file)
@@ -33,13 +33,18 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.TreeMap;
 
-import org.apache.hadoop.hive.metastore.StatObjectConverter;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.HiveMetaException;
+import org.apache.hadoop.hive.metastore.ObjectStore;
+import org.apache.hadoop.hive.metastore.StatObjectConverter;
 import org.apache.hadoop.hive.metastore.api.AggrStats;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.Catalog;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -47,6 +52,7 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TableMeta;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
 import org.apache.hadoop.hive.metastore.utils.StringUtils;
 import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator;
@@ -56,6 +62,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import static org.apache.hadoop.hive.metastore.cache.CachedStore.partNameToVals;
 import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
 
 public class SharedCache {
@@ -85,7 +92,7 @@ public class SharedCache {
   private static HashMap<Class<?>, ObjectEstimator> sizeEstimators = null;
 
   enum StatsType {
-    ALL(0), ALLBUTDEFAULT(1);
+    ALL(0), ALLBUTDEFAULT(1), PARTIAL(2);
 
     private final int position;
 
@@ -249,6 +256,7 @@ public class SharedCache {
         tableLock.readLock().lock();
         PartitionWrapper wrapper = partitionCache.get(CacheUtils.buildPartitionCacheKey(partVals));
         if (wrapper == null) {
+          LOG.debug("Partition: " + partVals + " is not present in the cache.");
           return null;
         }
         part = CacheUtils.assemble(wrapper, sharedCache);
@@ -342,6 +350,26 @@ public class SharedCache {
       }
     }
 
+    public void alterPartitionAndStats(List<String> partVals, SharedCache sharedCache, long writeId,
+                                       Map<String,String> parameters, List<ColumnStatisticsObj> colStatsObjs) {
+      try {
+        tableLock.writeLock().lock();
+        PartitionWrapper partitionWrapper = partitionCache.get(CacheUtils.buildPartitionCacheKey(partVals));
+        if (partitionWrapper == null) {
+          LOG.info("Partition " + partVals + " is missing from cache. Cannot update the partition stats in cache.");
+          return;
+        }
+        Partition newPart = partitionWrapper.getPartition();
+        newPart.setParameters(parameters);
+        newPart.setWriteId(writeId);
+        removePartition(partVals, sharedCache);
+        cachePartition(newPart, sharedCache);
+        updatePartitionColStats(partVals, colStatsObjs);
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
     public void alterPartitions(List<List<String>> partValsList, List<Partition> newParts,
         SharedCache sharedCache) {
       try {
@@ -445,7 +473,9 @@ public class SharedCache {
       }
     }
 
-    public List<ColumnStatisticsObj> getCachedTableColStats(List<String> colNames) {
+    public ColumnStatistics getCachedTableColStats(ColumnStatisticsDesc csd, List<String> colNames,
+                                                            String validWriteIds, boolean areTxnStatsSupported)
+            throws MetaException {
       List<ColumnStatisticsObj> colStatObjs = new ArrayList<ColumnStatisticsObj>();
       try {
         tableLock.readLock().lock();
@@ -455,10 +485,11 @@ public class SharedCache {
             colStatObjs.add(colStatObj);
           }
         }
+        return CachedStore.adjustColStatForGet(getTable().getParameters(), new ColumnStatistics(csd, colStatObjs),
+                getTable().getWriteId(), validWriteIds, areTxnStatsSupported);
       } finally {
         tableLock.readLock().unlock();
       }
-      return colStatObjs;
     }
 
     public void removeTableColStats(String colName) {
@@ -485,16 +516,88 @@ public class SharedCache {
       }
     }
 
-    public ColumnStatisticsObj getPartitionColStats(List<String> partVal, String colName) {
+    public ColumStatsWithWriteId getPartitionColStats(List<String> partVal, String colName, String writeIdList) {
       try {
         tableLock.readLock().lock();
-        return partitionColStatsCache
-            .get(CacheUtils.buildPartitonColStatsCacheKey(partVal, colName));
+        ColumnStatisticsObj statisticsObj =
+                partitionColStatsCache.get(CacheUtils.buildPartitonColStatsCacheKey(partVal, colName));
+        if (statisticsObj == null || writeIdList == null) {
+          return new ColumStatsWithWriteId(-1, statisticsObj);
+        }
+        PartitionWrapper wrapper = partitionCache.get(CacheUtils.buildPartitionCacheKey(partVal));
+        if (wrapper == null) {
+          LOG.info("Partition: " + partVal + " is not present in the cache. Cannot update stats in cache.");
+          return null;
+        }
+        long writeId = wrapper.getPartition().getWriteId();
+        ValidWriteIdList list4TheQuery = new ValidReaderWriteIdList(writeIdList);
+        // Just check if the write ID is valid. If it's valid (i.e. we are allowed to see it),
+        // that means it cannot possibly be a concurrent write. If it's not valid (we are not
+        // allowed to see it), that means it's either concurrent or aborted, same thing for us.
+        if (!list4TheQuery.isWriteIdValid(writeId)) {
+          LOG.debug("Write id list " + writeIdList + " is not compatible with write id " + writeId);
+          return null;
+        }
+        return new ColumStatsWithWriteId(writeId, statisticsObj);
       } finally {
         tableLock.readLock().unlock();
       }
     }
 
+    public List<ColumnStatistics> getPartColStatsList(List<String> partNames, List<String> colNames,
+                                              String writeIdList, boolean txnStatSupported) throws MetaException {
+      List<ColumnStatistics> colStatObjs = new ArrayList<>();
+      try {
+        tableLock.readLock().lock();
+        Table tbl = getTable();
+        for (String partName : partNames) {
+          ColumnStatisticsDesc csd = new ColumnStatisticsDesc(false,
+                  tbl.getDbName(), tbl.getTableName());
+          csd.setCatName(tbl.getCatName());
+          csd.setPartName(partName);
+          csd.setLastAnalyzed(0); //TODO : Need to get last analysed. This is not being used by anybody now.
+          List<ColumnStatisticsObj> statObject = new ArrayList<>();
+          List<String> partVal =  Warehouse.getPartValuesFromPartName(partName);
+          for (String colName : colNames) {
+            ColumnStatisticsObj statisticsObj =
+                    partitionColStatsCache.get(CacheUtils.buildPartitonColStatsCacheKey(partVal, colName));
+            if (statisticsObj != null) {
+              statObject.add(statisticsObj);
+            } else {
+              LOG.info("Stats not available in cachedStore for col " + colName + " in partition " + partVal);
+              return null;
+            }
+          }
+          ColumnStatistics columnStatistics = new ColumnStatistics(csd, statObject);
+          if (writeIdList != null && TxnUtils.isTransactionalTable(getParameters())) {
+            columnStatistics.setIsStatsCompliant(true);
+            if (!txnStatSupported) {
+              columnStatistics.setIsStatsCompliant(false);
+            } else {
+              PartitionWrapper wrapper =
+                      partitionCache.get(CacheUtils.buildPartitionCacheKey(partVal));
+              if (wrapper == null) {
+                columnStatistics.setIsStatsCompliant(false);
+              } else {
+                Partition partition = wrapper.getPartition();
+                if (!ObjectStore.isCurrentStatsValidForTheQuery(partition.getParameters(),
+                        partition.getWriteId(), writeIdList, false)) {
+                  LOG.debug("The current cached store transactional partition column statistics for {}.{}.{} "
+                                  + "(write ID {}) are not valid for current query ({})", tbl.getDbName(),
+                          tbl.getTableName(), partName, partition.getWriteId(), writeIdList);
+                  columnStatistics.setIsStatsCompliant(false);
+                }
+              }
+            }
+          }
+          colStatObjs.add(columnStatistics);
+        }
+      } finally {
+        tableLock.readLock().unlock();
+      }
+      return colStatObjs;
+    }
+
     public boolean updatePartitionColStats(List<String> partVal,
         List<ColumnStatisticsObj> colStatsObjs) {
       try {
@@ -661,11 +764,30 @@ public class SharedCache {
     }
 
     public void refreshAggrPartitionColStats(AggrStats aggrStatsAllPartitions,
-        AggrStats aggrStatsAllButDefaultPartition) {
+        AggrStats aggrStatsAllButDefaultPartition, SharedCache sharedCache, Map<List<String>, Long> partNameToWriteId) {
       Map<String, List<ColumnStatisticsObj>> newAggrColStatsCache =
           new HashMap<String, List<ColumnStatisticsObj>>();
       try {
         tableLock.writeLock().lock();
+        if (partNameToWriteId != null) {
+          for (Entry<List<String>, Long> partValuesWriteIdSet : partNameToWriteId.entrySet()) {
+            List<String> partValues = partValuesWriteIdSet.getKey();
+            Partition partition = getPartition(partValues, sharedCache);
+            if (partition == null) {
+              LOG.info("Could not refresh the aggregate stat as partition " + partValues + " does not exist");
+              return;
+            }
+
+            // for txn tables, if the write id is modified means the partition is updated post fetching of stats. So
+            // skip updating the aggregate stats in the cache.
+            long writeId = partition.getWriteId();
+            if (writeId != partValuesWriteIdSet.getValue()) {
+              LOG.info("Could not refresh the aggregate stat as partition " + partValues + " has write id " +
+                      partValuesWriteIdSet.getValue() + " instead of " + writeId);
+              return;
+            }
+          }
+        }
         if (aggrStatsAllPartitions != null) {
           for (ColumnStatisticsObj statObj : aggrStatsAllPartitions.getColStats()) {
             if (isAggrPartitionColStatsCacheDirty.compareAndSet(true, false)) {
@@ -794,6 +916,23 @@ public class SharedCache {
     }
   }
 
+  public static class ColumStatsWithWriteId {
+    private long writeId;
+    private ColumnStatisticsObj columnStatisticsObj;
+    public ColumStatsWithWriteId(long writeId, ColumnStatisticsObj columnStatisticsObj) {
+      this.writeId = writeId;
+      this.columnStatisticsObj = columnStatisticsObj;
+    }
+
+    public long getWriteId() {
+      return writeId;
+    }
+
+    public ColumnStatisticsObj getColumnStatisticsObj() {
+      return columnStatisticsObj;
+    }
+  }
+
   public void populateCatalogsInCache(Collection<Catalog> catalogs) {
     for (Catalog cat : catalogs) {
       Catalog catCopy = cat.deepCopy();
@@ -1205,6 +1344,30 @@ public class SharedCache {
     }
   }
 
+  public void alterTableAndStatsInCache(String catName, String dbName, String tblName, long writeId,
+                                        List<ColumnStatisticsObj> colStatsObjs, Map<String,String> newParams) {
+    try {
+      cacheLock.writeLock().lock();
+      TableWrapper tblWrapper =
+              tableCache.remove(CacheUtils.buildTableKey(catName, dbName, tblName));
+      if (tblWrapper == null) {
+        LOG.info("Table " + tblName + " is missing from cache. Cannot update table stats in cache");
+        return;
+      }
+      Table newTable = tblWrapper.getTable();
+      newTable.setWriteId(writeId);
+      newTable.setParameters(newParams);
+      //tblWrapper.updateTableObj(newTable, this);
+      String newDbName = StringUtils.normalizeIdentifier(newTable.getDbName());
+      String newTblName = StringUtils.normalizeIdentifier(newTable.getTableName());
+      tableCache.put(CacheUtils.buildTableKey(catName, newDbName, newTblName), tblWrapper);
+      tblWrapper.updateTableColStats(colStatsObjs);
+      isTableCacheDirty.set(true);
+    } finally {
+      cacheLock.writeLock().unlock();
+    }
+  }
+
   public List<Table> listCachedTables(String catName, String dbName) {
     List<Table> tables = new ArrayList<>();
     try {
@@ -1299,19 +1462,20 @@ public class SharedCache {
     }
   }
 
-  public List<ColumnStatisticsObj> getTableColStatsFromCache(String catName, String dbName,
-      String tblName, List<String> colNames) {
-    List<ColumnStatisticsObj> colStatObjs = new ArrayList<>();
+  public ColumnStatistics getTableColStatsFromCache(String catName, String dbName,
+      String tblName, List<String> colNames, String validWriteIds, boolean areTxnStatsSupported) throws MetaException {
     try {
       cacheLock.readLock().lock();
       TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
-      if (tblWrapper != null) {
-        colStatObjs = tblWrapper.getCachedTableColStats(colNames);
+      if (tblWrapper == null) {
+        LOG.info("Table " + tblName + " is missing from cache.");
+        return null;
       }
+      ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tblName);
+      return tblWrapper.getCachedTableColStats(csd, colNames, validWriteIds, areTxnStatsSupported);
     } finally {
       cacheLock.readLock().unlock();
     }
-    return colStatObjs;
   }
 
   public void removeTableColStatsFromCache(String catName, String dbName, String tblName,
@@ -1321,6 +1485,8 @@ public class SharedCache {
       TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
         tblWrapper.removeTableColStats(colName);
+      } else {
+        LOG.info("Table " + tblName + " is missing from cache.");
       }
     } finally {
       cacheLock.readLock().unlock();
@@ -1333,6 +1499,8 @@ public class SharedCache {
       TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
         tblWrapper.removeAllTableColStats();
+      } else {
+        LOG.info("Table " + tblName + " is missing from cache.");
       }
     } finally {
       cacheLock.readLock().unlock();
@@ -1347,6 +1515,8 @@ public class SharedCache {
           tableCache.get(CacheUtils.buildTableKey(catName, dbName, tableName));
       if (tblWrapper != null) {
         tblWrapper.updateTableColStats(colStatsForTable);
+      } else {
+        LOG.info("Table " + tableName + " is missing from cache.");
       }
     } finally {
       cacheLock.readLock().unlock();
@@ -1361,6 +1531,8 @@ public class SharedCache {
           tableCache.get(CacheUtils.buildTableKey(catName, dbName, tableName));
       if (tblWrapper != null) {
         tblWrapper.refreshTableColStats(colStatsForTable);
+      } else {
+        LOG.info("Table " + tableName + " is missing from cache.");
       }
     } finally {
       cacheLock.readLock().unlock();
@@ -1513,6 +1685,20 @@ public class SharedCache {
     }
   }
 
+  public void alterPartitionAndStatsInCache(String catName, String dbName, String tblName, long writeId,
+                                            List<String> partVals, Map<String,String> parameters,
+                                            List<ColumnStatisticsObj> colStatsObjs) {
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+      if (tblWrapper != null) {
+        tblWrapper.alterPartitionAndStats(partVals, this, writeId, parameters, colStatsObjs);
+      }
+    } finally {
+      cacheLock.readLock().unlock();
+    }
+  }
+
   public void alterPartitionsInCache(String catName, String dbName, String tblName,
       List<List<String>> partValsList, List<Partition> newParts) {
     try {
@@ -1578,14 +1764,14 @@ public class SharedCache {
     }
   }
 
-  public ColumnStatisticsObj getPartitionColStatsFromCache(String catName, String dbName,
-      String tblName, List<String> partVal, String colName) {
-    ColumnStatisticsObj colStatObj = null;
+  public ColumStatsWithWriteId getPartitionColStatsFromCache(String catName, String dbName,
+      String tblName, List<String> partVal, String colName, String writeIdList) {
+    ColumStatsWithWriteId colStatObj = null;
     try {
       cacheLock.readLock().lock();
       TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
-        colStatObj = tblWrapper.getPartitionColStats(partVal, colName);
+        colStatObj = tblWrapper.getPartitionColStats(partVal, colName, writeIdList);
       }
     } finally {
       cacheLock.readLock().unlock();
@@ -1593,6 +1779,24 @@ public class SharedCache {
     return colStatObj;
   }
 
+  public List<ColumnStatistics> getPartitionColStatsListFromCache(String catName, String dbName, String tblName,
+                                                                  List<String> partNames, List<String> colNames,
+                                                                  String writeIdList, boolean txnStatSupported) {
+    List<ColumnStatistics> colStatObjs = null;
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+      if (tblWrapper != null) {
+        colStatObjs = tblWrapper.getPartColStatsList(partNames, colNames, writeIdList, txnStatSupported);
+      }
+    } catch (MetaException e) {
+      LOG.warn("Failed to get partition column statistics");
+    } finally {
+      cacheLock.readLock().unlock();
+    }
+    return colStatObjs;
+  }
+
   public void refreshPartitionColStatsInCache(String catName, String dbName, String tblName,
       List<ColumnStatistics> partitionColStats) {
     try {
@@ -1635,13 +1839,14 @@ public class SharedCache {
   }
 
   public void refreshAggregateStatsInCache(String catName, String dbName, String tblName,
-      AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) {
+      AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition,
+                                           Map<List<String>, Long> partNameToWriteId) {
     try {
       cacheLock.readLock().lock();
       TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
         tblWrapper.refreshAggrPartitionColStats(aggrStatsAllPartitions,
-            aggrStatsAllButDefaultPartition);
+            aggrStatsAllButDefaultPartition, this, partNameToWriteId);
       }
     } finally {
       cacheLock.readLock().unlock();
index 094f799..ba61a08 100644 (file)
@@ -35,7 +35,6 @@ import java.util.Map;
 @InterfaceStability.Stable
 public class UpdatePartitionColumnStatEvent extends ListenerEvent {
   private ColumnStatistics partColStats;
-  private String validWriteIds;
   private long writeId;
   private Map<String, String> parameters;
   private List<String> partVals;
@@ -45,16 +44,14 @@ public class UpdatePartitionColumnStatEvent extends ListenerEvent {
    * @param statsObj Columns statistics Info.
    * @param partVals partition names
    * @param parameters table parameters to be updated after stats are updated.
-   * @param validWriteIds valid write id list for the query.
+   * @param tableObj table object
    * @param writeId writeId for the query.
    * @param handler handler that is firing the event
    */
   public UpdatePartitionColumnStatEvent(ColumnStatistics statsObj, List<String> partVals, Map<String, String> parameters,
-                                        Table tableObj, String validWriteIds, long writeId,
-                                        IHMSHandler handler) {
+                                        Table tableObj, long writeId, IHMSHandler handler) {
     super(true, handler);
     this.partColStats = statsObj;
-    this.validWriteIds = validWriteIds;
     this.writeId = writeId;
     this.parameters = parameters;
     this.partVals = partVals;
@@ -71,7 +68,6 @@ public class UpdatePartitionColumnStatEvent extends ListenerEvent {
     super(true, handler);
     this.partColStats = statsObj;
     this.partVals = partVals;
-    this.validWriteIds = null;
     this.writeId = 0;
     this.parameters = null;
     this.tableObj = tableObj;
@@ -81,10 +77,6 @@ public class UpdatePartitionColumnStatEvent extends ListenerEvent {
     return partColStats;
   }
 
-  public String getValidWriteIds() {
-    return validWriteIds;
-  }
-
   public long getWriteId() {
     return writeId;
   }
index 3f988bb..71300ab 100644 (file)
@@ -35,24 +35,22 @@ import java.util.Map;
 @InterfaceStability.Stable
 public class UpdateTableColumnStatEvent extends ListenerEvent {
   private ColumnStatistics colStats;
-  private String validWriteIds;
   private long writeId;
   private Map<String, String> parameters;
   private Table tableObj;
 
   /**
    * @param colStats Columns statistics Info.
+   * @param tableObj table object
    * @param parameters table parameters to be updated after stats are updated.
-   * @param validWriteIds valid write id list for the query.
-   * @param colStats writeId for the query.
+   * @param writeId writeId for the query.
    * @param handler handler that is firing the event
    */
   public UpdateTableColumnStatEvent(ColumnStatistics colStats, Table tableObj,
-                                    Map<String, String> parameters, String validWriteIds,
+                                    Map<String, String> parameters,
                                     long writeId, IHMSHandler handler) {
     super(true, handler);
     this.colStats = colStats;
-    this.validWriteIds = validWriteIds;
     this.writeId = writeId;
     this.parameters = parameters;
     this.tableObj = tableObj;
@@ -65,7 +63,6 @@ public class UpdateTableColumnStatEvent extends ListenerEvent {
   public UpdateTableColumnStatEvent(ColumnStatistics colStats, IHMSHandler handler) {
     super(true, handler);
     this.colStats = colStats;
-    this.validWriteIds = null;
     this.writeId = 0;
     this.parameters = null;
     this.tableObj = null;
@@ -75,10 +72,6 @@ public class UpdateTableColumnStatEvent extends ListenerEvent {
     return colStats;
   }
 
-  public String getValidWriteIds() {
-    return validWriteIds;
-  }
-
   public long getWriteId() {
     return writeId;
   }
index 10c6b44..15c4769 100644 (file)
@@ -289,9 +289,9 @@ public class MessageBuilder {
   public JSONUpdateTableColumnStatMessage buildUpdateTableColumnStatMessage(ColumnStatistics colStats,
                                                                             Table tableObj,
                                                                             Map<String, String> parameters,
-                                                                            String validWriteIds, long writeId) {
+                                                                            long writeId) {
     return new JSONUpdateTableColumnStatMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, now(),
-            colStats, tableObj, parameters, validWriteIds, writeId);
+            colStats, tableObj, parameters, writeId);
   }
 
   public JSONDeleteTableColumnStatMessage buildDeleteTableColumnStatMessage(String dbName, String colName) {
@@ -300,9 +300,9 @@ public class MessageBuilder {
 
   public JSONUpdatePartitionColumnStatMessage buildUpdatePartitionColumnStatMessage(ColumnStatistics colStats,
                                                             List<String> partVals, Map<String, String> parameters,
-                                                            Table tableObj, String validWriteIds, long writeId) {
+                                                            Table tableObj, long writeId) {
     return new JSONUpdatePartitionColumnStatMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, now(), colStats, partVals,
-            parameters, tableObj, validWriteIds, writeId);
+            parameters, tableObj, writeId);
   }
 
   public JSONDeletePartitionColumnStatMessage buildDeletePartitionColumnStatMessage(String dbName, String colName,
index 7eb6c07..e92a0dc 100644 (file)
@@ -34,8 +34,6 @@ public abstract class UpdatePartitionColumnStatMessage extends EventMessage {
 
   public abstract ColumnStatistics getColumnStatistics();
 
-  public abstract String getValidWriteIds();
-
   public abstract Long getWriteId();
 
   public abstract Map<String, String> getParameters();
index 7919b0e..e3f049c 100644 (file)
@@ -33,8 +33,6 @@ public abstract class UpdateTableColumnStatMessage extends EventMessage {
 
   public abstract ColumnStatistics getColumnStatistics();
 
-  public abstract String getValidWriteIds();
-
   public abstract Long getWriteId();
 
   public abstract Map<String, String> getParameters();
index 1b35df5..fd7fe00 100644 (file)
@@ -38,7 +38,7 @@ public class JSONUpdatePartitionColumnStatMessage extends UpdatePartitionColumnS
   private Long writeId, timestamp;
 
   @JsonProperty
-  private String validWriteIds, server, servicePrincipal, database;
+  private String server, servicePrincipal, database;
 
   @JsonProperty
   private String colStatsJson;
@@ -61,12 +61,11 @@ public class JSONUpdatePartitionColumnStatMessage extends UpdatePartitionColumnS
   public JSONUpdatePartitionColumnStatMessage(String server, String servicePrincipal, Long timestamp,
                                               ColumnStatistics colStats, List<String> partVals,
                                               Map<String, String> parameters,
-                                              Table tableObj, String validWriteIds, long writeId) {
+                                              Table tableObj, long writeId) {
     this.timestamp = timestamp;
     this.server = server;
     this.servicePrincipal = servicePrincipal;
     this.writeId = writeId;
-    this.validWriteIds = validWriteIds;
     this.database = colStats.getStatsDesc().getDbName();
     this.partVals = partVals;
     try {
@@ -108,11 +107,6 @@ public class JSONUpdatePartitionColumnStatMessage extends UpdatePartitionColumnS
   }
 
   @Override
-  public String getValidWriteIds() {
-    return validWriteIds;
-  }
-
-  @Override
   public Long getWriteId() {
     return writeId;
   }
index c932b7c..275d204 100644 (file)
@@ -36,7 +36,7 @@ public class JSONUpdateTableColumnStatMessage extends UpdateTableColumnStatMessa
   private Long writeId, timestamp;
 
   @JsonProperty
-  private String validWriteIds, server, servicePrincipal, database;
+  private String server, servicePrincipal, database;
 
   @JsonProperty
   private String colStatsJson;
@@ -55,12 +55,11 @@ public class JSONUpdateTableColumnStatMessage extends UpdateTableColumnStatMessa
 
   public JSONUpdateTableColumnStatMessage(String server, String servicePrincipal, Long timestamp,
                       ColumnStatistics colStats, Table tableObj, Map<String, String> parameters,
-                                          String validWriteIds, long writeId) {
+                                           long writeId) {
     this.timestamp = timestamp;
     this.server = server;
     this.servicePrincipal = servicePrincipal;
     this.writeId = writeId;
-    this.validWriteIds = validWriteIds;
     this.database = colStats.getStatsDesc().getDbName();
     try {
       this.colStatsJson = MessageBuilder.createTableColumnStatJson(colStats);
@@ -106,11 +105,6 @@ public class JSONUpdateTableColumnStatMessage extends UpdateTableColumnStatMessa
   }
 
   @Override
-  public String getValidWriteIds() {
-    return validWriteIds;
-  }
-
-  @Override
   public Long getWriteId() {
     return writeId;
   }
index 2f102a2..02ff4ae 100644 (file)
@@ -177,6 +177,8 @@ CREATE TABLE "APP"."NOTIFICATION_LOG" (
     "MESSAGE_FORMAT" VARCHAR(16)
 );
 
+CREATE UNIQUE INDEX "APP"."NOTIFICATION_LOG_EVENT_ID" ON "APP"."NOTIFICATION_LOG" ("EVENT_ID");
+
 CREATE TABLE "APP"."NOTIFICATION_SEQUENCE" ("NNI_ID" BIGINT NOT NULL, "NEXT_EVENT_ID" BIGINT NOT NULL);
 
 CREATE TABLE "APP"."KEY_CONSTRAINTS" ("CHILD_CD_ID" BIGINT, "CHILD_INTEGER_IDX" INTEGER, "CHILD_TBL_ID" BIGINT, "PARENT_CD_ID" BIGINT , "PARENT_INTEGER_IDX" INTEGER, "PARENT_TBL_ID" BIGINT NOT NULL,  "POSITION" BIGINT NOT NULL, "CONSTRAINT_NAME" VARCHAR(400) NOT NULL, "CONSTRAINT_TYPE" SMALLINT NOT NULL, "UPDATE_RULE" SMALLINT, "DELETE_RULE" SMALLINT, "ENABLE_VALIDATE_RELY" SMALLINT NOT NULL, "DEFAULT_VALUE" VARCHAR(400));
index 8ad56fc..1a1e34a 100644 (file)
@@ -9,6 +9,9 @@ UPDATE "APP"."WM_RESOURCEPLAN" SET NS = 'default' WHERE NS IS NULL;
 DROP INDEX "APP"."UNIQUE_WM_RESOURCEPLAN";
 CREATE UNIQUE INDEX "APP"."UNIQUE_WM_RESOURCEPLAN" ON "APP"."WM_RESOURCEPLAN" ("NS", "NAME");
 
+-- HIVE-21063
+CREATE UNIQUE INDEX "APP"."NOTIFICATION_LOG_EVENT_ID" ON "APP"."NOTIFICATION_LOG" ("EVENT_ID");
+
 -- This needs to be the last thing done.  Insert any changes above this line.
 UPDATE "APP".VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
 
index 383d3bc..4f58343 100644 (file)
@@ -629,6 +629,8 @@ CREATE TABLE NOTIFICATION_LOG
 
 ALTER TABLE NOTIFICATION_LOG ADD CONSTRAINT NOTIFICATION_LOG_PK PRIMARY KEY (NL_ID);
 
+CREATE UNIQUE INDEX NOTIFICATION_LOG_EVENT_ID ON NOTIFICATION_LOG (EVENT_ID);
+
 CREATE TABLE NOTIFICATION_SEQUENCE
 (
     NNI_ID bigint NOT NULL,
index edde08d..e0d143a 100644 (file)
@@ -10,6 +10,9 @@ UPDATE WM_RESOURCEPLAN SET NS = 'default' WHERE NS IS NULL;
 DROP INDEX UNIQUE_WM_RESOURCEPLAN ON WM_RESOURCEPLAN;
 CREATE UNIQUE INDEX UNIQUE_WM_RESOURCEPLAN ON WM_RESOURCEPLAN ("NS", "NAME");
 
+-- HIVE-21063
+CREATE UNIQUE INDEX NOTIFICATION_LOG_EVENT_ID ON NOTIFICATION_LOG (EVENT_ID);
+
 -- These lines need to be last.  Insert any changes above.
 UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE;
index 5466537..8db11d3 100644 (file)
@@ -871,6 +871,8 @@ CREATE TABLE IF NOT EXISTS `NOTIFICATION_LOG`
     PRIMARY KEY (`NL_ID`)
 ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
 
+CREATE UNIQUE INDEX `NOTIFICATION_LOG_EVENT_ID` ON NOTIFICATION_LOG (`EVENT_ID`) USING BTREE;
+
 CREATE TABLE IF NOT EXISTS `NOTIFICATION_SEQUENCE`
 (
     `NNI_ID` BIGINT(20) NOT NULL,
index 701acb0..47c3831 100644 (file)
@@ -11,6 +11,9 @@ UPDATE `WM_RESOURCEPLAN` SET `NS` = 'default' WHERE `NS` IS NULL;
 ALTER TABLE `WM_RESOURCEPLAN` DROP KEY `UNIQUE_WM_RESOURCEPLAN`;
 ALTER TABLE `WM_RESOURCEPLAN` ADD UNIQUE KEY `UNIQUE_WM_RESOURCEPLAN` (`NAME`, `NS`);
 
+-- HIVE-21063
+CREATE UNIQUE INDEX `NOTIFICATION_LOG_EVENT_ID` ON NOTIFICATION_LOG (`EVENT_ID`) USING BTREE;
+
 -- These lines need to be last.  Insert any changes above.
 UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS ' ';
index 2a9c38f..8af9a76 100644 (file)
@@ -624,6 +624,8 @@ CREATE TABLE NOTIFICATION_LOG
 
 ALTER TABLE NOTIFICATION_LOG ADD CONSTRAINT NOTIFICATION_LOG_PK PRIMARY KEY (NL_ID);
 
+CREATE UNIQUE INDEX NOTIFICATION_LOG_EVENT_ID ON NOTIFICATION_LOG(EVENT_ID);
+
 CREATE TABLE NOTIFICATION_SEQUENCE
 (
     NNI_ID NUMBER NOT NULL,
index b9f6331..231376b 100644 (file)
@@ -9,6 +9,9 @@ UPDATE WM_RESOURCEPLAN SET NS = 'default' WHERE NS IS NULL;
 DROP INDEX UNIQUE_WM_RESOURCEPLAN;
 CREATE UNIQUE INDEX UNIQUE_WM_RESOURCEPLAN ON WM_RESOURCEPLAN (NS, "NAME");
 
+-- HIVE-21063
+CREATE UNIQUE INDEX NOTIFICATION_LOG_EVENT_ID ON NOTIFICATION_LOG(EVENT_ID);
+
 -- These lines need to be last.  Insert any changes above.
 UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS Status from dual;
index 0a359d9..2aff1e4 100644 (file)
@@ -631,6 +631,8 @@ CREATE TABLE "NOTIFICATION_LOG"
     PRIMARY KEY ("NL_ID")
 );
 
+CREATE UNIQUE INDEX "NOTIFICATION_LOG_EVENT_ID" ON "NOTIFICATION_LOG" USING btree ("EVENT_ID");
+
 CREATE TABLE "NOTIFICATION_SEQUENCE"
 (
     "NNI_ID" BIGINT NOT NULL,
index 0c36069..2d4363b 100644 (file)
@@ -11,6 +11,9 @@ UPDATE "WM_RESOURCEPLAN" SET "NS" = 'default' WHERE "NS" IS NULL;
 ALTER TABLE "WM_RESOURCEPLAN" DROP CONSTRAINT "UNIQUE_WM_RESOURCEPLAN";
 ALTER TABLE ONLY "WM_RESOURCEPLAN" ADD CONSTRAINT "UNIQUE_WM_RESOURCEPLAN" UNIQUE ("NS", "NAME");
 
+-- HIVE-21063
+CREATE UNIQUE INDEX "NOTIFICATION_LOG_EVENT_ID" ON "NOTIFICATION_LOG" USING btree ("EVENT_ID");
+
 -- These lines need to be last.  Insert any changes above.
 UPDATE "VERSION" SET "SCHEMA_VERSION"='4.0.0', "VERSION_COMMENT"='Hive release version 4.0.0' where "VER_ID"=1;
 SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0';
index 7429d18..77e0c98 100644 (file)
@@ -95,6 +95,7 @@ import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.Type;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.thrift.TException;
 import org.junit.Test;
@@ -3325,22 +3326,23 @@ public abstract class TestHiveMetaStore {
     Warehouse wh = mock(Warehouse.class);
     //Execute initializeAddedPartition() and it should not trigger updatePartitionStatsFast() as DO_NOT_UPDATE_STATS is true
     HiveMetaStore.HMSHandler hms = new HiveMetaStore.HMSHandler("", conf, false);
-    Method m = hms.getClass().getDeclaredMethod("initializeAddedPartition", Table.class, Partition.class, boolean.class);
+    Method m = hms.getClass().getDeclaredMethod("initializeAddedPartition", Table.class, Partition.class,
+            boolean.class, EnvironmentContext.class);
     m.setAccessible(true);
     //Invoke initializeAddedPartition();
-    m.invoke(hms, tbl, part, false);
+    m.invoke(hms, tbl, part, false, null);
     verify(wh, never()).getFileStatusesForLocation(part.getSd().getLocation());
 
     //Remove tbl's DO_NOT_UPDATE_STATS & set STATS_AUTO_GATHER = false
     tbl.unsetParameters();
     MetastoreConf.setBoolVar(conf, ConfVars.STATS_AUTO_GATHER, false);
-    m.invoke(hms, tbl, part, false);
+    m.invoke(hms, tbl, part, false, null);
     verify(wh, never()).getFileStatusesForLocation(part.getSd().getLocation());
 
     //Set STATS_AUTO_GATHER = true and set tbl as a VIRTUAL_VIEW
     MetastoreConf.setBoolVar(conf, ConfVars.STATS_AUTO_GATHER, true);
     tbl.setTableType("VIRTUAL_VIEW");
-    m.invoke(hms, tbl, part, false);
+    m.invoke(hms, tbl, part, false, null);
     verify(wh, never()).getFileStatusesForLocation(part.getSd().getLocation());
   }
 }