[CARBONDATA-3235] Fixed Alter Table Rename
authornamanrastogi <naman.rastogi.52@gmail.com>
Wed, 23 Jan 2019 12:27:35 +0000 (17:57 +0530)
committerkumarvishal09 <kumarvishal1802@gmail.com>
Mon, 28 Jan 2019 13:07:54 +0000 (18:37 +0530)
Fixed negative scenario: Alter Table Rename Table Fail

Problem: When tabe rename is success in hive, for failed in carbon data store, it would throw exception, but would not go back and undo rename in hive.

Solution: A flag to keep check if hive rename has already executed, and of the code breaks after hive rename is done, go back and undo the hive rename.

This closes #3098

integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala

index 01698c9..33f3cd9 100644 (file)
@@ -43,10 +43,12 @@ private[sql] case class CarbonAlterTableRenameCommand(
 
   override def processMetadata(sparkSession: SparkSession): Seq[Nothing] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    val oldTableIdentifier = alterTableRenameModel.oldTableIdentifier
-    val newTableIdentifier = alterTableRenameModel.newTableIdentifier
-    val oldDatabaseName = oldTableIdentifier.database
+    val oldTableName = alterTableRenameModel.oldTableIdentifier.table.toLowerCase
+    val newTableName = alterTableRenameModel.newTableIdentifier.table.toLowerCase
+    val oldDatabaseName = alterTableRenameModel.oldTableIdentifier.database
       .getOrElse(sparkSession.catalog.currentDatabase)
+    val oldTableIdentifier = TableIdentifier(oldTableName, Some(oldDatabaseName))
+    val newTableIdentifier = TableIdentifier(newTableName, Some(oldDatabaseName))
     setAuditTable(oldDatabaseName, oldTableIdentifier.table)
     setAuditInfo(Map("newName" -> alterTableRenameModel.newTableIdentifier.table))
     val newDatabaseName = newTableIdentifier.database
@@ -59,8 +61,6 @@ private[sql] case class CarbonAlterTableRenameCommand(
       throw new MalformedCarbonCommandException(s"Table with name $newTableIdentifier " +
                                                 s"already exists")
     }
-    val oldTableName = oldTableIdentifier.table.toLowerCase
-    val newTableName = newTableIdentifier.table.toLowerCase
     LOGGER.info(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
     val relation: CarbonRelation =
@@ -108,8 +108,8 @@ private[sql] case class CarbonAlterTableRenameCommand(
         dataMapSchemaList.addAll(indexSchemas)
       }
       // invalid data map for the old table, see CARBON-1690
-      val oldTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-      DataMapStoreManager.getInstance().clearDataMaps(oldTableIdentifier)
+      val oldAbsoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+      DataMapStoreManager.getInstance().clearDataMaps(oldAbsoluteTableIdentifier)
       // get the latest carbon table and check for column existence
       val operationContext = new OperationContext
       // TODO: Pass new Table Path in pre-event.
@@ -125,7 +125,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
       schemaEvolutionEntry.setTableName(newTableName)
       timeStamp = System.currentTimeMillis()
       schemaEvolutionEntry.setTime_stamp(timeStamp)
-      val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
+      val newCarbonTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
         newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
       val oldIdentifier = TableIdentifier(oldTableName, Some(oldDatabaseName))
       val newIdentifier = TableIdentifier(newTableName, Some(oldDatabaseName))
@@ -133,17 +133,17 @@ private[sql] case class CarbonAlterTableRenameCommand(
       var partitions: Seq[CatalogTablePartition] = Seq.empty
       if (carbonTable.isHivePartitionTable) {
         partitions =
-          sparkSession.sessionState.catalog.listPartitions(oldIdentifier)
+          sparkSession.sessionState.catalog.listPartitions(oldTableIdentifier)
       }
-      sparkSession.catalog.refreshTable(oldIdentifier.quotedString)
+      sparkSession.catalog.refreshTable(oldTableIdentifier.quotedString)
       sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].alterTableRename(
-          oldIdentifier,
-          newIdentifier,
-        oldTableIdentifier.getTablePath)
+          oldTableIdentifier,
+          newTableIdentifier,
+        oldAbsoluteTableIdentifier.getTablePath)
       hiveRenameSuccess = true
 
       metastore.updateTableSchemaForAlter(
-        newTableIdentifier,
+        newCarbonTableIdentifier,
         carbonTable.getCarbonTableIdentifier,
         tableInfo,
         schemaEvolutionEntry,
@@ -157,11 +157,11 @@ private[sql] case class CarbonAlterTableRenameCommand(
       val alterTableRenamePostEvent: AlterTableRenamePostEvent = AlterTableRenamePostEvent(
         carbonTable,
         alterTableRenameModel,
-        oldTableIdentifier.getTablePath,
+        oldAbsoluteTableIdentifier.getTablePath,
         sparkSession)
       OperationListenerBus.getInstance().fireEvent(alterTableRenamePostEvent, operationContext)
 
-      sparkSession.catalog.refreshTable(newIdentifier.quotedString)
+      sparkSession.catalog.refreshTable(newTableIdentifier.quotedString)
       LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
     } catch {
       case e: ConcurrentOperationException =>
@@ -171,7 +171,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
           sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].alterTableRename(
             newTableIdentifier,
             oldTableIdentifier,
-            carbonTable.getAbsoluteTableIdentifier.getTableName)
+            carbonTable.getAbsoluteTableIdentifier.getTablePath)
         }
         if (carbonTable != null) {
           AlterTableUtil.revertRenameTableChanges(