[CARBONDATA-4338] Moving dropped partition data to trash master
authorMahesh Raju Somalaraju <mahesh.somalaraju@huawei.com>
Mon, 6 Jun 2022 11:22:28 +0000 (16:52 +0530)
committerIndhumathi27 <indhumathim27@gmail.com>
Tue, 19 Jul 2022 04:44:03 +0000 (10:14 +0530)
Why is this PR needed?
When drop partition operation is performed carbon data will
modify only table status file and can not delete the actual
partition folder which contains data and index files. As
comply with hive behaviour carbon data also should delete
the deleted partition folder in storage[hdfs/obs/etc..].
Before deleting carbon data will keep copy in Trash folder.
User can restore it by checking the partition name and time stamp.

What changes were proposed in this PR?
Moved the deleted partition folder files to trash folder

Does this PR introduce any user interface change?
No

Is any new testcase added?
Yes

This closes #4276

core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
docs/configuration-parameters.md
docs/ddl-of-carbondata.md
docs/faq.md
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala

index 663f7d21cb20ab5a676f57337a76ee7bd129d25b..023137e815f133ac3152e03af7c0e4dc5235bdd0 100644 (file)
@@ -2886,5 +2886,15 @@ public final class CarbonCommonConstants {
 
   public static final String CARBON_ENABLE_MULTI_VERSION_TABLE_STATUS_DEFAULT = "false";
 
+  /**
+   * Enable this property to move the dropped partition data to trash on
+   * ALTER DROP PARTITION operation
+   * By default it is disabled if user want to move partition data to trash
+   * then enable this feature.
+   */
+  @CarbonProperty
+  public static final String CARBON_ENABLE_PARTITION_DATA_TRASH =
+      "carbon.enable.partitiondata.trash";
 
+  public static final String CARBON_ENABLE_PARTITION_DATA_TRASH_DEFAULT = "false";
 }
index bfa9b949fc4e1bdd12023a7ee9cbea2684fc2083..e395b7a6221d1932c6e761d244cb02ea5d6de3ec 100644 (file)
@@ -206,10 +206,12 @@ public class CleanFilesUtil {
   /**
    * This method will delete all the empty partition folders starting from the table path
    */
-  private static void deleteEmptyPartitionFoldersRecursively(CarbonFile tablePath) {
+  public static void deleteEmptyPartitionFoldersRecursively(CarbonFile tablePath) {
     CarbonFile[] listOfFiles = tablePath.listFiles();
     if (listOfFiles.length == 0) {
       tablePath.delete();
+      // if parent file folder also empty then delete that too.
+      deleteEmptyPartitionFoldersRecursively(tablePath.getParentFile());
     } else {
       for (CarbonFile file: listOfFiles) {
         if (file.isDirectory() && file.getName().contains("=")) {
index 47d196b7d936fc9989b00973f977f698f5ec9896..6e6131b54176ebabceb0b80314fdc343aa662c59 100644 (file)
@@ -268,4 +268,45 @@ public final class TrashUtil {
       timeStampSubFolder + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath
       .SEGMENT_PREFIX + segmentNumber;
   }
+
+  /**
+   * This will give the complete path of the trash folder with the timestamp and the partition name
+   *
+   * @param tablePath          absolute table path
+   * @param timeStampSubFolder the timestamp for the clean files operation
+   * @param partitionName      partition name for which files are moved to the trash folder
+   */
+  public static String getCompleteTrashFolderPathForPartition(String tablePath,
+      long timeStampSubFolder, String partitionName) {
+    return CarbonTablePath.getTrashFolderPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
+        + timeStampSubFolder + CarbonCommonConstants.FILE_SEPARATOR
+        + partitionName;
+  }
+
+  /**
+   * The below method copies dropped partition files to the trash folder.
+   *
+   * @param filesToCopy              absolute path of the files to copy to the trash folder
+   * @param trashFolderWithTimestamp trashfolderpath with complete timestamp and segment number
+   */
+  public static void copyPartitionDataToTrash(String filesToCopy, String trashFolderWithTimestamp) {
+    try {
+      if (!FileFactory.isFileExist(trashFolderWithTimestamp)) {
+        FileFactory.mkdirs(trashFolderWithTimestamp);
+      }
+      // check if file exists before copying
+      if (FileFactory.isFileExist(filesToCopy)) {
+        CarbonFile folder = FileFactory.getCarbonFile(filesToCopy);
+        CarbonFile[] dataFiles = folder.listFiles();
+        for (CarbonFile carbonFile : dataFiles) {
+          copyFileToTrashFolder(carbonFile.getAbsolutePath(), trashFolderWithTimestamp);
+        }
+      } else {
+        LOGGER.warn("Folder not copied to trash as partition folder does not exist");
+      }
+    } catch (IOException e) {
+      // If file is already moved or not found then continue with other files
+      LOGGER.warn("Unable to copy file to trash folder as file not found.", e);
+    }
+  }
 }
index 39064d4ae9dcc122b028eb109696ac47678c8d2e..38b4f1123344370fa7dc1f43abfc7d853ebf15f7 100644 (file)
@@ -58,6 +58,7 @@ This section provides the details of all the configurations required for the Car
 | carbon.lock.class | (none) | This specifies the implementation of ICarbonLock interface to be used for acquiring the locks in case of concurrent operations                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
 | carbon.data.file.version | V3 | This specifies carbondata file format version. Carbondata file format has evolved with time from V1 to V3 in terms of metadata storage and IO level pruning capabilities. You can find more details [here](https://carbondata.apache.org/file-structure-of-carbondata.html#carbondata-file-format).                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
 | spark.carbon.hive.schema.store | false | Carbondata currently supports 2 different types of metastores for storing schemas. This property specifies if Hive metastore is to be used for storing and retrieving table schemas                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
+| carbon.enable.partitiondata.trash | false | This property when enabled, will move the dropped partition data to trash on ALTER DROP PARTITION operation.
 
 ## Data Loading Configuration
 
index 3d04684c29a8dc6acdaba606bfb65bf613c1c969..ab2781f57971473e8a472b39a0b3ee1a2bb9dc49 100644 (file)
@@ -1107,6 +1107,7 @@ Users can specify which columns to include and exclude for local dictionary gene
   ALTER TABLE locationTable DROP PARTITION (country = 'US');
   ```
 
+   **NOTE:** Enable [carbon.enable.partitiondata.trash](./configuration-parameters.md#system-configuration) to move dropped partition data to trash during alter table DROP PARTITION.
 #### Insert OVERWRITE
 
   This command allows you to insert or load overwrite on a specific partition.
index cde3e9e4dab0274ea21e6dd5c1b4f39c09771ce5..0e28abc44892d9d3760653cb84e99c620c931bde 100644 (file)
@@ -31,6 +31,7 @@
 * [How to deal with the trailing task in query?](#How-to-deal-with-the-trailing-task-in-query)
 * [How to manage hybrid file format in carbondata table?](#How-to-manage-hybrid-file-format-in-carbondata-table)
 * [How to recover table status file if lost?](#How-to-recover-table-status-file-if-lost)
+* [Why deleted partition data still showing in file system?](#why-deleted-partition-data-still-showing-in-file-system)
 
 # TroubleShooting
 
@@ -481,4 +482,8 @@ TableStatusRecovery.main(args) --> args is of length two: 1. Database Name 2. Ta
 TableStatus Recovery tool cannot recover table status version files for the below two scenarios
 1. After compaction, if table status file is lost, cannot recover compacted commit transaction, as the lost version file only has merged load details.
 2. After Delete segment by Id/Date, if table status file is lost, cannot recover deleted segment commit transaction, as the lost version file only has the segment status as deleted.
-3. Table status recovery on materialized view table is not supported.
\ No newline at end of file
+3. Table status recovery on materialized view table is not supported.
+
+## Why deleted partition data still showing in file system
+By default, the dropped partition data will not be physically removed from the table store until the table is dropped. 
+Enable carbon.enable.partitiondata.trash property to move all the dropped partitions data to trash during alter table DROP PARTITION operation itself.
index d7d317f4f31ca6ee46cbf983b7a05b52eb9d891d..8399a3aa1f883015c1bd23e341cf31c87df81e05 100644 (file)
@@ -31,13 +31,15 @@ import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil
 import org.apache.spark.util.AlterTableUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.index.IndexStoreManager
-import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CleanFilesUtil, TrashUtil}
 import org.apache.carbondata.events._
 import org.apache.carbondata.spark.rdd.CarbonDropPartitionRDD
 
@@ -106,6 +108,32 @@ case class CarbonAlterTableDropHivePartitionCommand(
             ifExists,
             purge,
             retainData).run(sparkSession)
+          val isPartitionDataTrashEnabled = CarbonProperties.getInstance()
+            .getProperty(CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH,
+              CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH_DEFAULT).toBoolean
+          if (isPartitionDataTrashEnabled) {
+            // move  the partition files to trash folder which are dropped
+            val droppedPartitionNames = partitions.map { partition =>
+              partition.spec.map { specs => specs._1 + CarbonCommonConstants.EQUALS + specs._2 }
+            }
+            val timeStamp = System.currentTimeMillis()
+            droppedPartitionNames.zipWithIndex.foreach { partitionName =>
+              val droppedPartitionName = droppedPartitionNames(partitionName._2).mkString("/")
+              TrashUtil.copyPartitionDataToTrash(carbonPartitionsTobeDropped.get(partitionName._2),
+                TrashUtil.getCompleteTrashFolderPathForPartition(
+                  table.getTablePath,
+                  timeStamp,
+                  droppedPartitionName))
+            }
+            // Delete partition folder after copy to trash
+            carbonPartitionsTobeDropped.asScala.foreach(delPartition => {
+              val partitionPath = FileFactory.getCarbonFile(delPartition)
+              CarbonUtil.deleteFoldersAndFiles(partitionPath)
+            })
+            // Finally delete empty partition folders.
+            CleanFilesUtil.deleteEmptyPartitionFoldersRecursively(FileFactory
+              .getCarbonFile(table.getTablePath))
+          }
         }
       } catch {
         case e: Exception =>
index 0148d021401f11ead0ef297af012cd100c365b49..ab8082685931462c588b8b9457fa5a3c1ca05ae2 100644 (file)
 
 package org.apache.carbondata.spark.testsuite.standardpartition
 
+import java.io.File
 import java.nio.file.{Files, LinkOption, Paths}
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{CarbonEnv, Row}
 import org.apache.spark.sql.test.TestQueryExecutor
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
 
 class StandardPartitionTableDropTestCase extends QueryTest with BeforeAndAfterAll {
   // scalastyle:off lineLength
+  var count = 0
   override def beforeAll {
     dropTable
 
@@ -218,6 +222,146 @@ class StandardPartitionTableDropTestCase extends QueryTest with BeforeAndAfterAl
       Seq(Row(0)))
   }
 
+  test("dropping partition with moving data to trash") {
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH, "true")
+    sql("drop table if exists dropPartition1")
+    sql(
+      """
+        | CREATE TABLE dropPartition1 (empno int, empname String, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (deptname String,doj Timestamp,projectcode int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE dropPartition1 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE dropPartition1 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE dropPartition1 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE dropPartition1 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    checkAnswer(sql(s"""select count (*) from dropPartition1"""), Seq(Row(40)))
+    sql(s"""ALTER TABLE dropPartition1 DROP PARTITION(deptname='Learning')""")
+    checkAnswer(sql(s"""select count (*) from dropPartition1"""), Seq(Row(32)))
+    sql(s"""ALTER TABLE dropPartition1 DROP PARTITION(deptname='configManagement')""")
+    checkAnswer(sql(s"""select count (*) from dropPartition1"""), Seq(Row(28)))
+    sql(s"""ALTER TABLE dropPartition1 DROP PARTITION(deptname='network')""")
+    checkAnswer(sql(s"""select count (*) from dropPartition1"""), Seq(Row(16)))
+    sql(s"""ALTER TABLE dropPartition1 DROP PARTITION(deptname='protocol')""")
+    checkAnswer(sql(s"""select count (*) from dropPartition1"""), Seq(Row(8)))
+    sql(s"""ALTER TABLE dropPartition1 DROP PARTITION(deptname='security')""")
+    checkAnswer(sql(s"""select count (*) from dropPartition1"""), Seq(Row(0)))
+    val table = CarbonEnv.getCarbonTable(Option("default"), "dropPartition1")(sqlContext
+      .sparkSession)
+    val tablePath = table.getTablePath
+    val deptname = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=Learning")
+    }
+    assert(deptname.length == 0)
+    val configManagement = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=configManagement")
+    }
+    assert(configManagement.length == 0)
+    val network = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=network")
+    }
+    assert(network.length == 0)
+    val protocol = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=protocol")
+    }
+    assert(protocol.length == 0)
+    val security = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=security")
+    }
+    assert(security.length == 0)
+    sql("drop table if exists dropPartition1")
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH,
+      CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH_DEFAULT)
+  }
+
+  test("dropping partition with moving data to trash and count check") {
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH,
+      "true")
+    sql("drop table if exists dropPartition2")
+    sql(
+      """
+        | CREATE TABLE dropPartition2 (empno int, empname String, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (deptname String,doj Timestamp,projectcode int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE dropPartition2 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE dropPartition2 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    val table = CarbonEnv.getCarbonTable(Option("default"), "dropPartition2")(sqlContext
+      .sparkSession)
+    val tablePath = table.getTablePath
+
+    // check partition folder before dropping the partition
+    var deptname = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=Learning")
+    }
+    assert(deptname.length > 0)
+    var configManagement = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=configManagement")
+    }
+    assert(configManagement.length > 0)
+    // check the partitin folder after dropping the partition
+    sql(s"""ALTER TABLE dropPartition2 DROP PARTITION(deptname='Learning')""")
+    sql(s"""ALTER TABLE dropPartition2 DROP PARTITION(deptname='configManagement')""")
+
+    deptname = FileFactory.getCarbonFile(tablePath).listFiles().filter {
+      file => file.getName.equalsIgnoreCase("deptname=Learning")
+    }
+    assert(deptname.length == 0)
+    configManagement = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=configManagement")
+    }
+    assert(configManagement.length == 0)
+    // check the file count at trash folder
+    val trashFolderPath = tablePath + CarbonCommonConstants.FILE_SEPARATOR +
+                          CarbonTablePath.TRASH_DIR
+    assert(FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    val list = getFileCountInTrashFolder(trashFolderPath)
+    // carbondata files are added to the trash
+    assert(list > 0)
+    sql("drop table if exists dropPartition2")
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH,
+      CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH_DEFAULT)
+  }
+
+  def getFileCountInTrashFolder(dirPath: String) : Int = {
+    val fileName = new File(dirPath)
+    val files = fileName.listFiles()
+    if (files != null) {
+      files.foreach(file => {
+        if (file.isFile) {
+          count = count + 1
+        }
+        if (file.isDirectory()) {
+          getFileCountInTrashFolder(file.getAbsolutePath())
+        }
+      })
+    }
+    count
+  }
+
   test("test dropping on partition table for int partition column") {
     sql(
       """