HIVE-21177: ACID: When there are no delete deltas skip finding min max keys (Eugene...
authorEugene Koifman <ekoifman@apache.org>
Thu, 7 Feb 2019 17:49:19 +0000 (09:49 -0800)
committerEugene Koifman <ekoifman@apache.org>
Thu, 7 Feb 2019 17:49:32 +0000 (09:49 -0800)
ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java

index 9b51847..3961baa 100644 (file)
@@ -83,6 +83,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import javax.annotation.concurrent.Immutable;
 import java.nio.charset.Charset;
 
 /**
@@ -435,16 +436,16 @@ public class AcidUtils {
   }
 
   public static final class DirectoryImpl implements Directory {
-    private final List<FileStatus> abortedDirectories;
+    private final List<Path> abortedDirectories;
     private final boolean isBaseInRawFormat;
     private final List<HdfsFileStatusWithId> original;
-    private final List<FileStatus> obsolete;
+    private final List<Path> obsolete;
     private final List<ParsedDelta> deltas;
     private final Path base;
 
-    public DirectoryImpl(List<FileStatus> abortedDirectories,
+    public DirectoryImpl(List<Path> abortedDirectories,
         boolean isBaseInRawFormat, List<HdfsFileStatusWithId> original,
-        List<FileStatus> obsolete, List<ParsedDelta> deltas, Path base) {
+        List<Path> obsolete, List<ParsedDelta> deltas, Path base) {
       this.abortedDirectories = abortedDirectories == null ?
           Collections.emptyList() : abortedDirectories;
       this.isBaseInRawFormat = isBaseInRawFormat;
@@ -475,12 +476,12 @@ public class AcidUtils {
     }
 
     @Override
-    public List<FileStatus> getObsolete() {
+    public List<Path> getObsolete() {
       return obsolete;
     }
 
     @Override
-    public List<FileStatus> getAbortedDirectories() {
+    public List<Path> getAbortedDirectories() {
       return abortedDirectories;
     }
   }
@@ -740,7 +741,7 @@ public class AcidUtils {
     /**
      * Get the list of base and delta directories that are valid and not
      * obsolete.  Not {@code null}.  List must be sorted in a specific way.
-     * See {@link org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta#compareTo(org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta)}
+     * See {@link org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDeltaLight#compareTo(org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDeltaLight)}
      * for details.
      * @return the minimal list of current directories
      */
@@ -752,13 +753,13 @@ public class AcidUtils {
      * list of original files, bases, and deltas that have been replaced by
      * more up to date ones.  Not {@code null}.
      */
-    List<FileStatus> getObsolete();
+    List<Path> getObsolete();
 
     /**
      * Get the list of directories that has nothing but aborted transactions.
      * @return the list of aborted directories
      */
-    List<FileStatus> getAbortedDirectories();
+    List<Path> getAbortedDirectories();
   }
   /**
    * Since version 3 but prior to version 4, format of a base is "base_X" where X is a writeId.
@@ -804,18 +805,48 @@ public class AcidUtils {
           Long.parseLong(filename.substring(idxOfv + VISIBILITY_PREFIX.length())), path);
     }
   }
+
   /**
-   * Immutable
+   * In addition to {@link ParsedDeltaLight} this knows if the data is in raw format, i.e. doesn't
+   * have acid metadata columns embedded in the files.  To determine this in some cases
+   * requires looking at the footer of the data file which can be expensive so if this info is
+   * not needed {@link ParsedDeltaLight} should be used.
    */
-  public static final class ParsedDelta implements Comparable<ParsedDelta> {
-    private final long minWriteId;
-    private final long maxWriteId;
-    private final FileStatus path;
+  @Immutable
+  public static final class ParsedDelta extends ParsedDeltaLight {
+    private final boolean isRawFormat;
+    /**
+     * for pre 1.3.x delta files
+     */
+    private ParsedDelta(long min, long max, Path path, boolean isDeleteDelta,
+        boolean isRawFormat, long visibilityTxnId) {
+      this(min, max, path, -1, isDeleteDelta, isRawFormat, visibilityTxnId);
+    }
+    private ParsedDelta(long min, long max, Path path, int statementId,
+        boolean isDeleteDelta, boolean isRawFormat, long visibilityTxnId) {
+      super(min, max, path, statementId, isDeleteDelta, visibilityTxnId);
+      this.isRawFormat = isRawFormat;
+    }
+    /**
+     * Files w/o Acid meta columns embedded in the file. See {@link AcidBaseFileType#ORIGINAL_BASE}
+     */
+    public boolean isRawFormat() {
+      return isRawFormat;
+    }
+  }
+  /**
+   * This encapsulates info obtained form the file path.
+   * See also {@link ParsedDelta}.
+   */
+  @Immutable
+  public static class ParsedDeltaLight implements Comparable<ParsedDeltaLight> {
+    final long minWriteId;
+    final long maxWriteId;
+    final Path path;
     //-1 is for internal (getAcidState()) purposes and means the delta dir
     //had no statement ID
-    private final int statementId;
-    private final boolean isDeleteDelta; // records whether delta dir is of type 'delete_delta_x_y...'
-    private final boolean isRawFormat;
+    final int statementId;
+    final boolean isDeleteDelta; // records whether delta dir is of type 'delete_delta_x_y...'
     /**
      * transaction Id of txn which created this delta.  This dir should be considered
      * invisible unless this txn is committed
@@ -823,23 +854,22 @@ public class AcidUtils {
      * TODO: define TransactionallyVisible interface - add getVisibilityTxnId() etc and all comments
      * use in {@link ParsedBase}, {@link ParsedDelta}, {@link AcidInputFormat.Options}, AcidInputFormat.DeltaMetaData etc
      */
-    private final long visibilityTxnId;
-    /**
-     * for pre 1.3.x delta files
-     */
-    private ParsedDelta(long min, long max, FileStatus path, boolean isDeleteDelta,
-        boolean isRawFormat, long visibilityTxnId) {
-      this(min, max, path, -1, isDeleteDelta, isRawFormat, visibilityTxnId);
+    final long visibilityTxnId;
+
+    public static ParsedDeltaLight parse(Path deltaDir) {
+      //passing isRawFormat=false is bogus.  This is just to parse the file name.
+      ParsedDelta pd = parsedDelta(deltaDir, false);
+      return new ParsedDeltaLight(pd.getMinWriteId(), pd.getMaxWriteId(), deltaDir,
+          pd.getStatementId(), pd.isDeleteDelta(), pd.getVisibilityTxnId());
     }
-    private ParsedDelta(long min, long max, FileStatus path, int statementId,
-        boolean isDeleteDelta, boolean isRawFormat, long visibilityTxnId) {
+
+    private ParsedDeltaLight(long min, long max, Path path, int statementId,
+        boolean isDeleteDelta, long visibilityTxnId) {
       this.minWriteId = min;
       this.maxWriteId = max;
       this.path = path;
       this.statementId = statementId;
       this.isDeleteDelta = isDeleteDelta;
-      this.isRawFormat = isRawFormat;
-      assert !isDeleteDelta || !isRawFormat : " deleteDelta should not be raw format";
       this.visibilityTxnId = visibilityTxnId;
     }
 
@@ -852,7 +882,7 @@ public class AcidUtils {
     }
 
     public Path getPath() {
-      return path.getPath();
+      return path;
     }
 
     public int getStatementId() {
@@ -862,16 +892,18 @@ public class AcidUtils {
     public boolean isDeleteDelta() {
       return isDeleteDelta;
     }
-    /**
-     * Files w/o Acid meta columns embedded in the file. See {@link AcidBaseFileType#ORIGINAL_BASE}
-     */
-    public boolean isRawFormat() {
-      return isRawFormat;
-    }
     public long getVisibilityTxnId() {
       return visibilityTxnId;
     }
     /**
+     * Only un-compacted delta_x_y (x != y) (created by streaming ingest with batch size > 1)
+     * may contain a {@link OrcAcidUtils#getSideFile(Path)}.
+     * @return
+     */
+    boolean mayContainSideFile() {
+      return !isDeleteDelta() && getMinWriteId() != getMaxWriteId() && getVisibilityTxnId() <= 0;
+    }
+    /**
      * Compactions (Major/Minor) merge deltas/bases but delete of old files
      * happens in a different process; thus it's possible to have bases/deltas with
      * overlapping writeId boundaries.  The sort order helps figure out the "best" set of files
@@ -879,7 +911,7 @@ public class AcidUtils {
      * This sorts "wider" delta before "narrower" i.e. delta_5_20 sorts before delta_5_10 (and delta_11_20)
      */
     @Override
-    public int compareTo(ParsedDelta parsedDelta) {
+    public int compareTo(ParsedDeltaLight parsedDelta) {
       if (minWriteId != parsedDelta.minWriteId) {
         if (minWriteId < parsedDelta.minWriteId) {
           return -1;
@@ -990,9 +1022,9 @@ public class AcidUtils {
     return parsedDelta(deltaDir, DELTA_PREFIX, fs); // default prefix is delta_prefix
   }
 
-  private static ParsedDelta parseDelta(FileStatus path, String deltaPrefix, FileSystem fs)
+  private static ParsedDelta parseDelta(Path path, String deltaPrefix, FileSystem fs)
     throws IOException {
-    ParsedDelta p = parsedDelta(path.getPath(), deltaPrefix, fs);
+    ParsedDelta p = parsedDelta(path, deltaPrefix, fs);
     boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX);
     return new ParsedDelta(p.getMinWriteId(),
         p.getMaxWriteId(), path, p.statementId, isDeleteDelta, p.isRawFormat(), p.visibilityTxnId);
@@ -1132,9 +1164,9 @@ public class AcidUtils {
     // The following 'deltas' includes all kinds of delta files including insert & delete deltas.
     final List<ParsedDelta> deltas = new ArrayList<ParsedDelta>();
     List<ParsedDelta> working = new ArrayList<ParsedDelta>();
-    List<FileStatus> originalDirectories = new ArrayList<FileStatus>();
-    final List<FileStatus> obsolete = new ArrayList<FileStatus>();
-    final List<FileStatus> abortedDirectories = new ArrayList<>();
+    List<Path> originalDirectories = new ArrayList<>();
+    final List<Path> obsolete = new ArrayList<>();
+    final List<Path> abortedDirectories = new ArrayList<>();
     List<HdfsFileStatusWithId> childrenWithId = null;
     Boolean val = useFileIds.value;
     if (val == null || val) {
@@ -1169,9 +1201,9 @@ public class AcidUtils {
     if (bestBase.status != null) {
       // Add original files to obsolete list if any
       for (HdfsFileStatusWithId fswid : original) {
-        obsolete.add(fswid.getFileStatus());
+        obsolete.add(fswid.getFileStatus().getPath());
       }
-      // Add original direcotries to obsolete list if any
+      // Add original directories to obsolete list if any
       obsolete.addAll(originalDirectories);
       // remove the entries so we don't get confused later and think we should
       // use them.
@@ -1180,7 +1212,7 @@ public class AcidUtils {
     } else {
       // Okay, we're going to need these originals.  Recurse through them and figure out what we
       // really need.
-      for (FileStatus origDir : originalDirectories) {
+      for (Path origDir : originalDirectories) {
         findOriginals(fs, origDir, original, useFileIds, ignoreEmptyFiles, true);
       }
     }
@@ -1308,9 +1340,9 @@ public class AcidUtils {
 
   }
   private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId,
-      ValidWriteIdList writeIdList, List<ParsedDelta> working, List<FileStatus> originalDirectories,
-      List<HdfsFileStatusWithId> original, List<FileStatus> obsolete, TxnBase bestBase,
-      boolean ignoreEmptyFiles, List<FileStatus> aborted, Map<String, String> tblproperties,
+      ValidWriteIdList writeIdList, List<ParsedDelta> working, List<Path> originalDirectories,
+      List<HdfsFileStatusWithId> original, List<Path> obsolete, TxnBase bestBase,
+      boolean ignoreEmptyFiles, List<Path> aborted, Map<String, String> tblproperties,
       FileSystem fs, ValidTxnList validTxnList) throws IOException {
     Path p = child.getPath();
     String fn = p.getName();
@@ -1322,7 +1354,7 @@ public class AcidUtils {
     }
     if (fn.startsWith(BASE_PREFIX)) {
       ParsedBase parsedBase = ParsedBase.parseBase(p);
-      if(!isDirUsable(child, parsedBase.getVisibilityTxnId(), aborted, validTxnList)) {
+      if(!isDirUsable(child.getPath(), parsedBase.getVisibilityTxnId(), aborted, validTxnList)) {
         return;
       }
       final long writeId = parsedBase.getWriteId();
@@ -1338,22 +1370,22 @@ public class AcidUtils {
         }
       } else if (bestBase.writeId < writeId) {
         if(isValidBase(parsedBase, writeIdList, fs)) {
-          obsolete.add(bestBase.status);
+          obsolete.add(bestBase.status.getPath());
           bestBase.status = child;
           bestBase.writeId = writeId;
         }
       } else {
-        obsolete.add(child);
+        obsolete.add(child.getPath());
       }
     } else if (fn.startsWith(DELTA_PREFIX) || fn.startsWith(DELETE_DELTA_PREFIX)) {
       String deltaPrefix = fn.startsWith(DELTA_PREFIX)  ? DELTA_PREFIX : DELETE_DELTA_PREFIX;
-      ParsedDelta delta = parseDelta(child, deltaPrefix, fs);
-      if(!isDirUsable(child, delta.getVisibilityTxnId(), aborted, validTxnList)) {
+      ParsedDelta delta = parseDelta(child.getPath(), deltaPrefix, fs);
+      if(!isDirUsable(child.getPath(), delta.getVisibilityTxnId(), aborted, validTxnList)) {
         return;
       }
       if(ValidWriteIdList.RangeResponse.ALL ==
           writeIdList.isWriteIdRangeAborted(delta.minWriteId, delta.maxWriteId)) {
-        aborted.add(child);
+        aborted.add(child.getPath());
       }
       else if (writeIdList.isWriteIdRangeValid(
           delta.minWriteId, delta.maxWriteId) != ValidWriteIdList.RangeResponse.NONE) {
@@ -1364,16 +1396,16 @@ public class AcidUtils {
       // do this until we have determined there is no base.  This saves time.  Plus,
       // it is possible that the cleaner is running and removing these original files,
       // in which case recursing through them could cause us to get an error.
-      originalDirectories.add(child);
+      originalDirectories.add(child.getPath());
     }
   }
   /**
    * checks {@code visibilityTxnId} to see if {@code child} is committed in current snapshot
    */
-  private static boolean isDirUsable(FileStatus child, long visibilityTxnId,
-      List<FileStatus> aborted, ValidTxnList validTxnList) {
+  private static boolean isDirUsable(Path child, long visibilityTxnId,
+      List<Path> aborted, ValidTxnList validTxnList) {
     if(validTxnList == null) {
-      throw new IllegalArgumentException("No ValidTxnList for " + child.getPath());
+      throw new IllegalArgumentException("No ValidTxnList for " + child);
     }
     if(!validTxnList.isTxnValid(visibilityTxnId)) {
       boolean isAborted = validTxnList.isTxnAborted(visibilityTxnId);
@@ -1411,19 +1443,18 @@ public class AcidUtils {
   /**
    * Find the original files (non-ACID layout) recursively under the partition directory.
    * @param fs the file system
-   * @param stat the directory to add
+   * @param dir the directory to add
    * @param original the list of original files
    * @throws IOException
    */
-  public static void findOriginals(FileSystem fs, FileStatus stat,
+  public static void findOriginals(FileSystem fs, Path dir,
       List<HdfsFileStatusWithId> original, Ref<Boolean> useFileIds,
       boolean ignoreEmptyFiles, boolean recursive) throws IOException {
-    assert stat.isDir();
     List<HdfsFileStatusWithId> childrenWithId = null;
     Boolean val = useFileIds.value;
     if (val == null || val) {
       try {
-        childrenWithId = SHIMS.listLocatedHdfsStatus(fs, stat.getPath(), hiddenFileFilter);
+        childrenWithId = SHIMS.listLocatedHdfsStatus(fs, dir, hiddenFileFilter);
         if (val == null) {
           useFileIds.value = true;
         }
@@ -1438,7 +1469,8 @@ public class AcidUtils {
       for (HdfsFileStatusWithId child : childrenWithId) {
         if (child.getFileStatus().isDirectory()) {
           if (recursive) {
-            findOriginals(fs, child.getFileStatus(), original, useFileIds, ignoreEmptyFiles, true);
+            findOriginals(fs, child.getFileStatus().getPath(), original, useFileIds,
+                ignoreEmptyFiles, true);
           }
         } else {
           if(!ignoreEmptyFiles || child.getFileStatus().getLen() > 0) {
@@ -1447,11 +1479,11 @@ public class AcidUtils {
         }
       }
     } else {
-      List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, stat.getPath(), hiddenFileFilter);
+      List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, dir, hiddenFileFilter);
       for (FileStatus child : children) {
-        if (child.isDir()) {
+        if (child.isDirectory()) {
           if (recursive) {
-            findOriginals(fs, child, original, useFileIds, ignoreEmptyFiles, true);
+            findOriginals(fs, child.getPath(), original, useFileIds, ignoreEmptyFiles, true);
           }
         } else {
           if(!ignoreEmptyFiles || child.getLen() > 0) {
@@ -1494,7 +1526,9 @@ public class AcidUtils {
   public static boolean isDeleteDelta(Path p) {
     return p.getName().startsWith(DELETE_DELTA_PREFIX);
   }
-
+  public static boolean isInsertDelta(Path p) {
+    return p.getName().startsWith(DELTA_PREFIX);
+  }
   public static boolean isTransactionalTable(CreateTableDesc table) {
     if (table == null || table.getTblProps() == null) {
       return false;
@@ -1661,6 +1695,16 @@ public class AcidUtils {
    * @param file - data file to read/compute splits on
    */
   public static long getLogicalLength(FileSystem fs, FileStatus file) throws IOException {
+    Path acidDir = file.getPath().getParent(); //should be base_x or delta_x_y_
+    if(AcidUtils.isInsertDelta(acidDir)) {
+      ParsedDeltaLight pd = ParsedDeltaLight.parse(acidDir);
+      if(!pd.mayContainSideFile()) {
+        return file.getLen();
+      }
+    }
+    else {
+      return file.getLen();
+    }
     Path lengths = OrcAcidUtils.getSideFile(file.getPath());
     if(!fs.exists(lengths)) {
       /**
@@ -2009,11 +2053,33 @@ public class AcidUtils {
     }
 
     /**
-     * Checks if the files in base/delta dir are a result of Load Data statement and thus do not
-     * have ROW_IDs embedded in the data.
+     * Checks if the files in base/delta dir are a result of Load Data/Add Partition statement
+     * and thus do not have ROW_IDs embedded in the data.
+     * This is only meaningful for full CRUD tables - Insert-only tables have all their data
+     * in raw format by definition.
      * @param baseOrDeltaDir base or delta file.
      */
     public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs) throws IOException {
+      //todo: this could be optimized - for full CRUD table only base_x and delta_x_x could have
+      // files in raw format delta_x_y (x != y) whether from streaming ingested or compaction
+      // must be native Acid format by definition
+      if(isDeleteDelta(baseOrDeltaDir)) {
+        return false;
+      }
+      if(isInsertDelta(baseOrDeltaDir)) {
+        ParsedDeltaLight pd = ParsedDeltaLight.parse(baseOrDeltaDir);
+        if(pd.getMinWriteId() != pd.getMaxWriteId()) {
+          //must be either result of streaming or compaction
+          return false;
+        }
+      }
+      else {
+        //must be base_x
+        if(isCompactedBase(ParsedBase.parseBase(baseOrDeltaDir), fs)) {
+          return false;
+        }
+      }
+      //if here, have to check the files
       Path dataFile = chooseFile(baseOrDeltaDir, fs);
       if (dataFile == null) {
         //directory is empty or doesn't have any that could have been produced by load data
index 720dbe5..ca25449 100644 (file)
@@ -1206,7 +1206,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
             context.conf.getBoolean("mapred.input.dir.recursive", false));
         List<HdfsFileStatusWithId> originals = new ArrayList<>();
         List<AcidBaseFileInfo> baseFiles = new ArrayList<>();
-        AcidUtils.findOriginals(fs, fs.getFileStatus(dir), originals, useFileIds, true, isRecursive);
+        AcidUtils.findOriginals(fs, dir, originals, useFileIds, true, isRecursive);
         for (HdfsFileStatusWithId fileId : originals) {
           baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.ORIGINAL_BASE));
         }
index 7c4bc4d..62a1061 100644 (file)
@@ -1107,6 +1107,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           throw new IllegalStateException(delta + " is not delete delta and is not compacting.");
         }
         ReaderKey key = new ReaderKey();
+        //todo: only need to know isRawFormat if compacting for acid V2 and V2 should normally run
+        //in vectorized mode - i.e. this is not a significant perf overhead vs ParsedDeltaLight
         AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta, delta.getFileSystem(conf));
         if(deltaDir.isRawFormat()) {
           assert !deltaDir.isDeleteDelta() : delta.toString();
@@ -1228,8 +1230,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
                 parent);
           }
           else {
-            AcidUtils.ParsedDelta pd = AcidUtils.parsedDelta(parent, AcidUtils.DELTA_PREFIX,
-              parent.getFileSystem(conf));
+            AcidUtils.ParsedDeltaLight pd = AcidUtils.ParsedDeltaLight.parse(parent);
             return new TransactionMetaData(pd.getMinWriteId(), parent, pd.getStatementId());
           }
         }
index 06b0209..57eb506 100644 (file)
@@ -226,7 +226,7 @@ public class Cleaner extends MetaStoreCompactorThread {
           throws IOException, NoSuchObjectException {
     Path locPath = new Path(location);
     AcidUtils.Directory dir = AcidUtils.getAcidState(locPath, conf, writeIdList);
-    List<FileStatus> obsoleteDirs = dir.getObsolete();
+    List<Path> obsoleteDirs = dir.getObsolete();
     /**
      * add anything in 'dir'  that only has data from aborted transactions - no one should be
      * trying to read anything in that dir (except getAcidState() that only reads the name of
@@ -239,11 +239,11 @@ public class Cleaner extends MetaStoreCompactorThread {
     obsoleteDirs.addAll(dir.getAbortedDirectories());
     List<Path> filesToDelete = new ArrayList<>(obsoleteDirs.size());
     StringBuilder extraDebugInfo = new StringBuilder("[");
-    for (FileStatus stat : obsoleteDirs) {
-      filesToDelete.add(stat.getPath());
-      extraDebugInfo.append(stat.getPath().getName()).append(",");
-      if(!FileUtils.isPathWithinSubtree(stat.getPath(), locPath)) {
-        LOG.info(idWatermark(ci) + " found unexpected file: " + stat.getPath());
+    for (Path stat : obsoleteDirs) {
+      filesToDelete.add(stat);
+      extraDebugInfo.append(stat.getName()).append(",");
+      if(!FileUtils.isPathWithinSubtree(stat, locPath)) {
+        LOG.info(idWatermark(ci) + " found unexpected file: " + stat);
       }
     }
     extraDebugInfo.setCharAt(extraDebugInfo.length() - 1, ']');
index cde47da..f52b023 100644 (file)
@@ -339,13 +339,8 @@ public class CompactorMR {
   }
 
   /**
-   * 
-   * @param conf
-   * @param t
-   * @param p
    * @param sd (this is the resolved StorageDescriptor, i.e. resolved to table or partition)
    * @param writeIds (valid write ids used to filter rows while they're being read for compaction)
-   * @param ci
    * @throws IOException
    */
   private void runCrudCompaction(HiveConf hiveConf, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds,
@@ -497,8 +492,9 @@ public class CompactorMR {
    * We need each final bucket file soreted by original write id (ascending), bucket (ascending) and row id (ascending). 
    * (current write id will be the same as original write id). 
    * We will be achieving the ordering via a custom split grouper for compactor.
-   * See {@link org.apache.hadoop.hive.conf.HiveConf.ConfVars.SPLIT_GROUPING_MODE} for the config description.
-   * See {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorGroups} for details on the mechanism.
+   * See {@link org.apache.hadoop.hive.conf.HiveConf.ConfVars#SPLIT_GROUPING_MODE} for the config description.
+   * See {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorSplitGroups(InputSplit[], Configuration)}
+   *  for details on the mechanism.
    */
   private String buildCrudMajorCompactionCreateTableQuery(String fullName, Table t, StorageDescriptor sd) {
     StringBuilder query = new StringBuilder("create temporary table ").append(fullName).append(" (");
@@ -547,7 +543,8 @@ public class CompactorMR {
   /**
    * Move and rename bucket files from the temp table (tmpTableName), to the new base path under the source table/ptn.
    * Since the temp table is a non-transactional table, it has file names in the "original" format.
-   * Also, due to split grouping in {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorGroups}, 
+   * Also, due to split grouping in
+   * {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorSplitGroups(InputSplit[], Configuration)},
    * we will end up with one file per bucket.
    */
   private void commitCrudMajorCompaction(Table t, String from, String tmpTableName, String to, Configuration conf,
@@ -822,11 +819,7 @@ public class CompactorMR {
   // Remove the directories for aborted transactions only
   private void removeFilesForMmTable(HiveConf conf, Directory dir) throws IOException {
     // For MM table, we only want to delete delta dirs for aborted txns.
-    List<FileStatus> abortedDirs = dir.getAbortedDirectories();
-    List<Path> filesToDelete = new ArrayList<>(abortedDirs.size());
-    for (FileStatus stat : abortedDirs) {
-      filesToDelete.add(stat.getPath());
-    }
+    List<Path> filesToDelete = dir.getAbortedDirectories();
     if (filesToDelete.size() < 1) {
       return;
     }
index 02fde22..c5faec5 100644 (file)
@@ -115,7 +115,7 @@ public class TestAcidUtils {
         AcidUtils.ParsedBase.parseBase(new Path("/tmp/base_000123")).getWriteId());
     assertEquals(0,
         AcidUtils.ParsedBase.parseBase(new Path("/tmp/base_000123")).getVisibilityTxnId());
-    Path dir = new Path("/tmp/tbl");
+    Path dir = new Path("mock:/tmp/");
     AcidOutputFormat.Options opts =
         AcidUtils.parseBaseOrDeltaBucketFilename(new Path(dir, "base_567/bucket_123"),
             conf);
@@ -202,12 +202,12 @@ public class TestAcidUtils {
         AcidUtils.getAcidState(new MockPath(fs,
             "mock:/tbl/part1"), conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"));
     assertEquals(null, dir.getBaseDirectory());
-    List<FileStatus> obsolete = dir.getObsolete();
+    List<Path> obsolete = dir.getObsolete();
     assertEquals(2, obsolete.size());
     assertEquals("mock:/tbl/part1/delta_025_025",
-        obsolete.get(0).getPath().toString());
+        obsolete.get(0).toString());
     assertEquals("mock:/tbl/part1/delta_029_029",
-        obsolete.get(1).getPath().toString());
+        obsolete.get(1).toString());
     List<HdfsFileStatusWithId> result = dir.getOriginalFiles();
     assertEquals(5, result.size());
     assertEquals("mock:/tbl/part1/000000_0", result.get(0).getFileStatus().getPath().toString());
@@ -246,13 +246,13 @@ public class TestAcidUtils {
         AcidUtils.getAcidState(new MockPath(fs,
             "mock:/tbl/part1"), conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"));
     assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString());
-    List<FileStatus> obsolete = dir.getObsolete();
+    List<Path> obsolete = dir.getObsolete();
     assertEquals(5, obsolete.size());
-    assertEquals("mock:/tbl/part1/base_10", obsolete.get(0).getPath().toString());
-    assertEquals("mock:/tbl/part1/base_5", obsolete.get(1).getPath().toString());
-    assertEquals("mock:/tbl/part1/delta_025_030", obsolete.get(2).getPath().toString());
-    assertEquals("mock:/tbl/part1/delta_025_025", obsolete.get(3).getPath().toString());
-    assertEquals("mock:/tbl/part1/delta_029_029", obsolete.get(4).getPath().toString());
+    assertEquals("mock:/tbl/part1/base_10", obsolete.get(0).toString());
+    assertEquals("mock:/tbl/part1/base_5", obsolete.get(1).toString());
+    assertEquals("mock:/tbl/part1/delta_025_030", obsolete.get(2).toString());
+    assertEquals("mock:/tbl/part1/delta_025_025", obsolete.get(3).toString());
+    assertEquals("mock:/tbl/part1/delta_029_029", obsolete.get(4).toString());
     assertEquals(0, dir.getOriginalFiles().size());
     List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
     assertEquals(1, deltas.size());
@@ -276,9 +276,9 @@ public class TestAcidUtils {
     AcidUtils.Directory dir =
         AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:150:" + Long.MAX_VALUE + ":"));
     // Obsolete list should include the two original bucket files, and the old base dir
-    List<FileStatus> obsolete = dir.getObsolete();
+    List<Path> obsolete = dir.getObsolete();
     assertEquals(3, obsolete.size());
-    assertEquals("mock:/tbl/part1/base_5", obsolete.get(0).getPath().toString());
+    assertEquals("mock:/tbl/part1/base_5", obsolete.get(0).toString());
     assertEquals("mock:/tbl/part1/base_10", dir.getBaseDirectory().toString());
   }
 
@@ -299,10 +299,10 @@ public class TestAcidUtils {
     AcidUtils.Directory dir =
         AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"));
     assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
-    List<FileStatus> obsolete = dir.getObsolete();
+    List<Path> obsolete = dir.getObsolete();
     assertEquals(2, obsolete.size());
-    assertEquals("mock:/tbl/part1/delta_052_55", obsolete.get(0).getPath().toString());
-    assertEquals("mock:/tbl/part1/delta_0060_60", obsolete.get(1).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_052_55", obsolete.get(0).toString());
+    assertEquals("mock:/tbl/part1/delta_0060_60", obsolete.get(1).toString());
     List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
     assertEquals(4, delts.size());
     assertEquals("mock:/tbl/part1/delta_40_60", delts.get(0).getPath().toString());
@@ -336,13 +336,13 @@ public class TestAcidUtils {
     AcidUtils.Directory dir
             = AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"));
     assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
-    List<FileStatus> obsolete = dir.getObsolete();
+    List<Path> obsolete = dir.getObsolete();
     assertEquals(5, obsolete.size());
-    assertEquals("mock:/tbl/part1/delta_052_55", obsolete.get(0).getPath().toString());
-    assertEquals("mock:/tbl/part1/delta_058_58", obsolete.get(1).getPath().toString());
-    assertEquals("mock:/tbl/part1/delta_0060_60_1", obsolete.get(2).getPath().toString());
-    assertEquals("mock:/tbl/part1/delta_0060_60_4", obsolete.get(3).getPath().toString());
-    assertEquals("mock:/tbl/part1/delta_0060_60_7", obsolete.get(4).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_052_55", obsolete.get(0).toString());
+    assertEquals("mock:/tbl/part1/delta_058_58", obsolete.get(1).toString());
+    assertEquals("mock:/tbl/part1/delta_0060_60_1", obsolete.get(2).toString());
+    assertEquals("mock:/tbl/part1/delta_0060_60_4", obsolete.get(3).toString());
+    assertEquals("mock:/tbl/part1/delta_0060_60_7", obsolete.get(4).toString());
     List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
     assertEquals(5, delts.size());
     assertEquals("mock:/tbl/part1/delta_40_60", delts.get(0).getPath().toString());
@@ -451,15 +451,15 @@ public class TestAcidUtils {
         AcidUtils.getAcidState(new MockPath(fs,
             "mock:/tbl/part1"), conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"));
     assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString());
-    List<FileStatus> obsolete = dir.getObsolete();
+    List<Path> obsolete = dir.getObsolete();
     assertEquals(7, obsolete.size());
-    assertEquals("mock:/tbl/part1/base_10", obsolete.get(0).getPath().toString());
-    assertEquals("mock:/tbl/part1/base_5", obsolete.get(1).getPath().toString());
-    assertEquals("mock:/tbl/part1/delete_delta_025_030", obsolete.get(2).getPath().toString());
-    assertEquals("mock:/tbl/part1/delta_025_030", obsolete.get(3).getPath().toString());
-    assertEquals("mock:/tbl/part1/delta_025_025", obsolete.get(4).getPath().toString());
-    assertEquals("mock:/tbl/part1/delete_delta_029_029", obsolete.get(5).getPath().toString());
-    assertEquals("mock:/tbl/part1/delta_029_029", obsolete.get(6).getPath().toString());
+    assertEquals("mock:/tbl/part1/base_10", obsolete.get(0).toString());
+    assertEquals("mock:/tbl/part1/base_5", obsolete.get(1).toString());
+    assertEquals("mock:/tbl/part1/delete_delta_025_030", obsolete.get(2).toString());
+    assertEquals("mock:/tbl/part1/delta_025_030", obsolete.get(3).toString());
+    assertEquals("mock:/tbl/part1/delta_025_025", obsolete.get(4).toString());
+    assertEquals("mock:/tbl/part1/delete_delta_029_029", obsolete.get(5).toString());
+    assertEquals("mock:/tbl/part1/delta_029_029", obsolete.get(6).toString());
     assertEquals(0, dir.getOriginalFiles().size());
     List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
     assertEquals(2, deltas.size());
@@ -490,11 +490,11 @@ public class TestAcidUtils {
     AcidUtils.Directory dir =
         AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"));
     assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
-    List<FileStatus> obsolete = dir.getObsolete();
+    List<Path> obsolete = dir.getObsolete();
     assertEquals(3, obsolete.size());
-    assertEquals("mock:/tbl/part1/delete_delta_052_55", obsolete.get(0).getPath().toString());
-    assertEquals("mock:/tbl/part1/delta_052_55", obsolete.get(1).getPath().toString());
-    assertEquals("mock:/tbl/part1/delta_0060_60", obsolete.get(2).getPath().toString());
+    assertEquals("mock:/tbl/part1/delete_delta_052_55", obsolete.get(0).toString());
+    assertEquals("mock:/tbl/part1/delta_052_55", obsolete.get(1).toString());
+    assertEquals("mock:/tbl/part1/delta_0060_60", obsolete.get(2).toString());
     List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
     assertEquals(6, delts.size());
     assertEquals("mock:/tbl/part1/delete_delta_40_60", delts.get(0).getPath().toString());
@@ -520,9 +520,9 @@ public class TestAcidUtils {
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
     AcidUtils.Directory dir =
         AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"));
-    List<FileStatus> obsolete = dir.getObsolete();
+    List<Path> obsolete = dir.getObsolete();
     assertEquals(1, obsolete.size());
-    assertEquals("mock:/tbl/part1/delete_delta_50_50", obsolete.get(0).getPath().toString());
+    assertEquals("mock:/tbl/part1/delete_delta_50_50", obsolete.get(0).toString());
     List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
     assertEquals(1, delts.size());
     assertEquals("mock:/tbl/part1/delta_40_60", delts.get(0).getPath().toString());
index 91458ea..50ebbfa 100644 (file)
@@ -2808,11 +2808,9 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: listLocatedStatus - mock:/mocktable
-    // call-2: check existence of side file for mock:/mocktable/0_0
-    // call-3: open - mock:/mocktable/0_0
-    // call-4: check existence of side file for mock:/mocktable/0_1
-    // call-5: open - mock:/mocktable/0_1
-    assertEquals(5, readOpsDelta);
+    // call-2: open - mock:/mocktable/0_0
+    // call-3: open - mock:/mocktable/0_1
+    assertEquals(3, readOpsDelta);
 
     assertEquals(2, splits.length);
     // revert back to local fs
@@ -2868,11 +2866,9 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: listLocatedStatus - mock:/mocktbl
-    // call-2: check existence of side file for mock:/mocktbl/0_0
-    // call-3: open - mock:/mocktbl/0_0
-    // call-4: check existence of side file for  mock:/mocktbl/0_1
-    // call-5: open - mock:/mocktbl/0_1
-    assertEquals(5, readOpsDelta);
+    // call-2: open - mock:/mocktbl/0_0
+    // call-3: open - mock:/mocktbl/0_1
+    assertEquals(3, readOpsDelta);
 
     // force BI to avoid reading footers
     conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI");
@@ -2890,9 +2886,7 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: listLocatedStatus - mock:/mocktbl
-    // call-2: check existence of side file for mock:/mocktbl/0_0
-    // call-3: check existence of side file for  mock:/mocktbl/0_1
-    assertEquals(3, readOpsDelta);
+    assertEquals(1, readOpsDelta);
 
     // enable cache and use default strategy
     conf.set(ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_MEMORY_SIZE.varname, "10Mb");
@@ -2911,11 +2905,9 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: listLocatedStatus - mock:/mocktbl
-    // call-2: check existence of side file for mock:/mocktbl/0_0
-    // call-3: open - mock:/mocktbl/0_0
-    // call-4: check existence of side file for mock:/mocktbl/0_1
-    // call-5: open - mock:/mocktbl/0_1
-    assertEquals(5, readOpsDelta);
+    // call-2: open - mock:/mocktbl/0_0
+    // call-3: open - mock:/mocktbl/0_1
+    assertEquals(3, readOpsDelta);
 
     for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
       if (statistics.getScheme().equalsIgnoreCase("mock")) {
@@ -2985,11 +2977,9 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: listLocatedStatus - mock:/mocktable
-    // call-2: check side file for mock:/mocktbl1/0_0
-    // call-3: open - mock:/mocktbl1/0_0
-    // call-4: check side file for  mock:/mocktbl1/0_1
-    // call-5: open - mock:/mocktbl1/0_1
-    assertEquals(5, readOpsDelta);
+    // call-2: open - mock:/mocktbl1/0_0
+    // call-3: open - mock:/mocktbl1/0_1
+    assertEquals(3, readOpsDelta);
 
     // change file length and look for cache misses
 
@@ -3026,11 +3016,9 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: listLocatedStatus - mock:/mocktable
-    // call-2: check side file for mock:/mocktbl1/0_0
-    // call-3: open - mock:/mocktbl1/0_0
-    // call-4: check side file for  mock:/mocktbl1/0_1
-    // call-5: open - mock:/mocktbl1/0_1
-    assertEquals(5, readOpsDelta);
+    // call-2: open - mock:/mocktbl1/0_0
+    // call-3: open - mock:/mocktbl1/0_1
+    assertEquals(3, readOpsDelta);
 
     for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
       if (statistics.getScheme().equalsIgnoreCase("mock")) {
@@ -3101,11 +3089,9 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: listLocatedStatus - mock:/mocktbl2
-    // call-2: check side file for mock:/mocktbl2/0_0
-    // call-3: open - mock:/mocktbl2/0_0
-    // call-4: check side file for  mock:/mocktbl2/0_1
-    // call-5: open - mock:/mocktbl2/0_1
-    assertEquals(5, readOpsDelta);
+    // call-2: open - mock:/mocktbl2/0_0
+    // call-3: open - mock:/mocktbl2/0_1
+    assertEquals(3, readOpsDelta);
 
     // change file modification time and look for cache misses
     FileSystem fs1 = FileSystem.get(conf);
@@ -3125,9 +3111,8 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: listLocatedStatus - mock:/mocktbl2
-    // call-2: check side file for  mock:/mocktbl2/0_1
-    // call-3: open - mock:/mocktbl2/0_1
-    assertEquals(3, readOpsDelta);
+    // call-2: open - mock:/mocktbl2/0_1
+    assertEquals(2, readOpsDelta);
 
     // touch the next file
     fs1 = FileSystem.get(conf);
@@ -3147,9 +3132,8 @@ public class TestInputOutputFormat {
       }
     }
     // call-1: listLocatedStatus - mock:/mocktbl2
-    // call-2: check side file for  mock:/mocktbl2/0_0
-    // call-3: open - mock:/mocktbl2/0_0
-    assertEquals(3, readOpsDelta);
+    // call-2: open - mock:/mocktbl2/0_0
+    assertEquals(2, readOpsDelta);
 
     for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
       if (statistics.getScheme().equalsIgnoreCase("mock")) {
@@ -3690,13 +3674,6 @@ public class TestInputOutputFormat {
         readOpsDelta = statistics.getReadOps() - readOpsBefore;
       }
     }
-    // call-1: open(mock:/mocktable7/0_0)
-    // call-2: open(mock:/mocktable7/0_0)
-    // call-3: listLocatedFileStatuses(mock:/mocktable7)
-    // call-4: getFileStatus(mock:/mocktable7/delta_0000001_0000001_0000/_metadata_acid)
-    // call-5: open(mock:/mocktable7/delta_0000001_0000001_0000/bucket_00001)
-    // call-6: getFileStatus(mock:/mocktable7/delta_0000001_0000001_0000/_metadata_acid)
-    // call-7: open(mock:/mocktable7/delta_0000001_0000001_0000/bucket_00001)
     assertEquals(7, readOpsDelta);
 
     // revert back to local fs
@@ -3771,11 +3748,6 @@ public class TestInputOutputFormat {
         readOpsDelta = statistics.getReadOps() - readOpsBefore;
       }
     }
-    // call-1: open to read data - split 1 => mock:/mocktable8/0_0
-    // call-2: listLocatedFileStatus(mock:/mocktable8)
-    // call-3: getFileStatus(mock:/mocktable8/delta_0000001_0000001_0000/_metadata_acid)
-    // call-4: getFileStatus(mock:/mocktable8/delta_0000001_0000001_0000/_metadata_acid)
-    // call-5: open(mock:/mocktable8/delta_0000001_0000001_0000/bucket_00001)
     assertEquals(5, readOpsDelta);
 
     // revert back to local fs
index e0dfeab..1656a5b 100644 (file)
@@ -543,10 +543,10 @@ public class TestOrcRawRecordMerger {
     /*create delta_1_1_0/bucket0 with 1 row and close the file*/
     AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
         .inspector(inspector).bucket(BUCKET).writingBase(false).minimumWriteId(1)
-        .maximumWriteId(1).finalDestination(root);
-    Path delta1_1_0 = new Path(root, AcidUtils.deltaSubdir(
+        .maximumWriteId(2).finalDestination(root);
+    Path delta1_2_0 = new Path(root, AcidUtils.deltaSubdir(
         options.getMinimumWriteId(), options.getMaximumWriteId(), options.getStatementId()));
-    Path bucket0 = AcidUtils.createBucketFile(delta1_1_0, BUCKET);
+    Path bucket0 = AcidUtils.createBucketFile(delta1_2_0, BUCKET);
     Path bucket0SideFile = OrcAcidUtils.getSideFile(bucket0);
 
     RecordUpdater ru = of.getRecordUpdater(root, options);