HIVE-21214 : MoveTask : Use attemptId instead of file size for deduplication of files...
authorDeepak Jaiswal <djaiswal@apache.org>
Tue, 5 Feb 2019 22:06:19 +0000 (14:06 -0800)
committerDeepak Jaiswal <djaiswal@apache.org>
Wed, 6 Feb 2019 16:31:18 +0000 (08:31 -0800)
ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java

index 8937b43..b84b052 100644 (file)
@@ -1308,7 +1308,7 @@ public final class Utilities {
    *          filename to extract taskid from
    */
   public static String getTaskIdFromFilename(String filename) {
-    return getIdFromFilename(filename, FILE_NAME_TO_TASK_ID_REGEX);
+    return getTaskIdFromFilename(filename, FILE_NAME_TO_TASK_ID_REGEX);
   }
 
   /**
@@ -1319,10 +1319,19 @@ public final class Utilities {
    *          filename to extract taskid from
    */
   public static String getPrefixedTaskIdFromFilename(String filename) {
-    return getIdFromFilename(filename, FILE_NAME_PREFIXED_TASK_ID_REGEX);
+    return getTaskIdFromFilename(filename, FILE_NAME_PREFIXED_TASK_ID_REGEX);
   }
 
-  private static String getIdFromFilename(String filename, Pattern pattern) {
+  private static String getTaskIdFromFilename(String filename, Pattern pattern) {
+    return getIdFromFilename(filename, pattern, 1);
+  }
+
+  public static int getAttemptIdFromFilename(String filename) {
+    String attemptStr = getIdFromFilename(filename, FILE_NAME_PREFIXED_TASK_ID_REGEX, 3);
+    return Integer.parseInt(attemptStr.substring(1));
+  }
+
+  private static String getIdFromFilename(String filename, Pattern pattern, int group) {
     String taskId = filename;
     int dirEnd = filename.lastIndexOf(Path.SEPARATOR);
     if (dirEnd != -1) {
@@ -1334,7 +1343,7 @@ public final class Utilities {
       LOG.warn("Unable to get task id from file name: {}. Using last component {}"
           + " as task id.", filename, taskId);
     } else {
-      taskId = m.group(1);
+      taskId = m.group(group);
     }
     LOG.debug("TaskId for {} = {}", filename, taskId);
     return taskId;
@@ -1823,10 +1832,10 @@ public final class Utilities {
 
   private static FileStatus compareTempOrDuplicateFiles(FileSystem fs,
       FileStatus file, FileStatus existingFile) throws IOException {
-    // Compare the file sizes of all the attempt files for the same task, the largest win
-    // any attempt files could contain partial results (due to task failures or
-    // speculative runs), but the largest should be the correct one since the result
-    // of a successful run should never be smaller than a failed/speculative run.
+    // Pick the one with mewest attempt ID. For sanity, check the file sizes too.
+    // If the file size of newest attempt is less than that for older one,
+    // Throw an exception as it maybe a correctness issue causing it.
+    // This breaks speculative execution if it ends prematurely.
     FileStatus toDelete = null, toRetain = null;
 
     // "LOAD .. INTO" and "INSERT INTO" commands will generate files with
@@ -1847,12 +1856,26 @@ public final class Utilities {
       return existingFile;
     }
 
-    if (existingFile.getLen() >= file.getLen()) {
-      toDelete = file;
+    int existingFileAttemptId = getAttemptIdFromFilename(existingFile.getPath().getName());
+    int fileAttemptId = getAttemptIdFromFilename(file.getPath().getName());
+
+    long existingFileSz = getFileSizeRecursively(fs, existingFile);
+    long fileSz = getFileSizeRecursively(fs, file);
+    // Files may come in any order irrespective of their attempt IDs
+    if (existingFileAttemptId > fileAttemptId &&
+        existingFileSz >= fileSz) {
+      // keep existing
       toRetain = existingFile;
-    } else {
-      toDelete = existingFile;
+      toDelete = file;
+    } else if (existingFileAttemptId < fileAttemptId &&
+        existingFileSz <= fileSz) {
+      // keep file
       toRetain = file;
+      toDelete = existingFile;
+    } else {
+      throw new IOException(" File " + filePath +
+        " with newer attempt ID " + fileAttemptId + " is smaller than the file "
+        + existingFile.getPath() + " with older attempt ID " + existingFileAttemptId);
     }
     if (!fs.delete(toDelete.getPath(), true)) {
       throw new IOException(
@@ -1863,9 +1886,33 @@ public final class Utilities {
           + toDelete.getLen() + ". Existing file: " + toRetain.getPath() + " with length "
           + toRetain.getLen());
     }
+
     return toRetain;
   }
 
+  // This function recurisvely fetches the size of all the files in given directory
+  private static long getFileSizeRecursively(FileSystem fs, FileStatus src)
+  throws IOException {
+    long size = 0;
+    if (src.isDirectory()) {
+      LOG.debug(" src " + src.getPath() + " is a directory");
+      // This is a directory.
+      try {
+        FileStatus[] files = fs.listStatus(src.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
+        // Recursively fetch sizes of each file
+        for (FileStatus file : files) {
+          size += getFileSizeRecursively(fs, file);
+        }
+      } catch (IOException e) {
+        throw new IOException("Unable to fetch files in directory " + src.getPath());
+      }
+    } else {
+      size = src.getLen();
+      LOG.debug("src " + src.getPath() + " is a file of size " + size);
+    }
+    return size;
+  }
+
   public static boolean isCopyFile(String filename) {
     String taskId = filename;
     String copyFileSuffix = null;