HDFS-4534. Add INodeReference in order to support rename with snapshots.
authorTsz-wo Sze <szetszwo@apache.org>
Tue, 19 Mar 2013 06:27:26 +0000 (06:27 +0000)
committerTsz-wo Sze <szetszwo@apache.org>
Tue, 19 Mar 2013 06:27:26 +0000 (06:27 +0000)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1458164 13f79535-47bb-0310-9956-ffa450edef68

15 files changed:
hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java [new file with mode: 0644]
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java [new file with mode: 0644]
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/Diff.java

index 7768529..93d5c95 100644 (file)
@@ -201,3 +201,6 @@ Branch-2802 Snapshot (Unreleased)
 
   HDFS-4556. Add snapshotdiff and LsSnapshottableDir tools to hdfs script.
   (Arpit Agarwal via szetszwo)
+
+  HDFS-4534. Add INodeReference in order to support rename with snapshots.
+  (szetszwo)
index 0e1f9a8..31cc86c 100644 (file)
@@ -62,6 +62,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.INodesInPath;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.Root;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotAccessControlException;
@@ -558,24 +559,22 @@ public class FSDirectory implements Closeable {
     verifyQuotaForRename(srcIIP.getINodes(), dstIIP.getINodes());
     
     boolean added = false;
-    INode srcChild = null;
-    byte[] srcChildName = null;
+    final INode srcChild = srcIIP.getLastINode();
+    final byte[] srcChildName = srcChild.getLocalNameBytes();
     try {
       // remove src
-      srcChild = removeLastINode(srcIIP);
-      if (srcChild == null) {
+      final int removedSrc = removeLastINode(srcIIP);
+      if (removedSrc == -1) {
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
             + "failed to rename " + src + " to " + dst
             + " because the source can not be removed");
         return false;
       }
-      srcChildName = srcChild.getLocalNameBytes();
       srcChild.setLocalName(dstComponents[dstComponents.length - 1]);
       
       // add src to the destination
       added = addLastINodeNoQuotaCheck(dstIIP, srcChild);
       if (added) {
-        srcChild = null;
         if (NameNode.stateChangeLog.isDebugEnabled()) {
           NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: " 
               + src + " is renamed to " + dst);
@@ -586,10 +585,16 @@ public class FSDirectory implements Closeable {
         dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshot());
         // update moved leases with new filename
         getFSNamesystem().unprotectedChangeLease(src, dst);        
+
+        if (srcIIP.getLatestSnapshot() != null) {
+          createReferences4Rename(srcChild, srcChildName,
+              (INodeDirectoryWithSnapshot)srcParent.asDirectory(),
+              dstParent.asDirectory());
+        }
         return true;
       }
     } finally {
-      if (!added && srcChild != null) {
+      if (!added) {
         // put it back
         srcChild.setLocalName(srcChildName);
         addLastINodeNoQuotaCheck(srcIIP, srcChild);
@@ -707,7 +712,7 @@ public class FSDirectory implements Closeable {
       }
     }
 
-    final INode dstParent = dstIIP.getINode(-2);
+    INode dstParent = dstIIP.getINode(-2);
     if (dstParent == null) {
       error = "rename destination parent " + dst + " not found.";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
@@ -723,27 +728,33 @@ public class FSDirectory implements Closeable {
 
     // Ensure dst has quota to accommodate rename
     verifyQuotaForRename(srcIIP.getINodes(), dstIIP.getINodes());
-    INode removedSrc = removeLastINode(srcIIP);
-    if (removedSrc == null) {
+
+    boolean undoRemoveSrc = true;
+    final int removedSrc = removeLastINode(srcIIP);
+    if (removedSrc == -1) {
       error = "Failed to rename " + src + " to " + dst
           + " because the source can not be removed";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
           + error);
       throw new IOException(error);
     }
-    final byte[] srcChildName = removedSrc.getLocalNameBytes();
-    byte[] dstChildName = null;
+    final INode srcChild = srcIIP.getLastINode();
+    final byte[] srcChildName = srcChild.getLocalNameBytes();
+
+    boolean undoRemoveDst = false;
     INode removedDst = null;
     try {
       if (dstInode != null) { // dst exists remove it
-        removedDst = removeLastINode(dstIIP);
-        dstChildName = removedDst.getLocalNameBytes();
+        if (removeLastINode(dstIIP) != -1) {
+          removedDst = dstIIP.getLastINode();
+          undoRemoveDst = true;
+        }
       }
+      srcChild.setLocalName(dstIIP.getLastLocalName());
 
-      removedSrc.setLocalName(dstIIP.getLastLocalName());
       // add src as dst to complete rename
-      if (addLastINodeNoQuotaCheck(dstIIP, removedSrc)) {
-        removedSrc = null;
+      if (addLastINodeNoQuotaCheck(dstIIP, srcChild)) {
+        undoRemoveSrc = false;
         if (NameNode.stateChangeLog.isDebugEnabled()) {
           NameNode.stateChangeLog.debug(
               "DIR* FSDirectory.unprotectedRenameTo: " + src
@@ -752,6 +763,7 @@ public class FSDirectory implements Closeable {
 
         final INode srcParent = srcIIP.getINode(-2);
         srcParent.updateModificationTime(timestamp, srcIIP.getLatestSnapshot());
+        dstParent = dstIIP.getINode(-2);
         dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshot());
         // update moved lease with new filename
         getFSNamesystem().unprotectedChangeLease(src, dst);
@@ -759,14 +771,19 @@ public class FSDirectory implements Closeable {
         // Collect the blocks and remove the lease for previous dst
         long filesDeleted = -1;
         if (removedDst != null) {
-          INode rmdst = removedDst;
-          removedDst = null;
+          undoRemoveDst = false;
           BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
-          filesDeleted = rmdst.cleanSubtree(null, dstIIP.getLatestSnapshot(),
-              collectedBlocks).get(Quota.NAMESPACE);
+          filesDeleted = removedDst.cleanSubtree(null,
+              dstIIP.getLatestSnapshot(), collectedBlocks).get(Quota.NAMESPACE);
           getFSNamesystem().removePathAndBlocks(src, collectedBlocks);
         }
 
+        if (srcIIP.getLatestSnapshot() != null) {
+          createReferences4Rename(srcChild, srcChildName,
+              (INodeDirectoryWithSnapshot)srcParent.asDirectory(),
+              dstParent.asDirectory());
+        }
+
         if (snapshottableDirs.size() > 0) {
           // There are snapshottable directories (without snapshots) to be
           // deleted. Need to update the SnapshotManager.
@@ -775,14 +792,13 @@ public class FSDirectory implements Closeable {
         return filesDeleted >= 0;
       }
     } finally {
-      if (removedSrc != null) {
+      if (undoRemoveSrc) {
         // Rename failed - restore src
-        removedSrc.setLocalName(srcChildName);
-        addLastINodeNoQuotaCheck(srcIIP, removedSrc);
+        srcChild.setLocalName(srcChildName);
+        addLastINodeNoQuotaCheck(srcIIP, srcChild);
       }
-      if (removedDst != null) {
+      if (undoRemoveDst) {
         // Rename failed - restore dst
-        removedDst.setLocalName(dstChildName);
         addLastINodeNoQuotaCheck(dstIIP, removedDst);
       }
     }
@@ -791,6 +807,18 @@ public class FSDirectory implements Closeable {
     throw new IOException("rename from " + src + " to " + dst + " failed.");
   }
 
+  /** The renamed inode is also in a snapshot, create references */
+  private static void createReferences4Rename(final INode srcChild,
+      final byte[] srcChildName, final INodeDirectoryWithSnapshot srcParent,
+      final INodeDirectory dstParent) {
+    final INodeReference.WithCount ref;
+    if (srcChild.isReference()) {
+      ref = (INodeReference.WithCount)srcChild.asReference().getReferredINode();
+    } else {
+      ref = dstParent.asDirectory().replaceChild4Reference(srcChild);
+    }
+    srcParent.replaceRemovedChild4Reference(srcChild, ref, srcChildName);
+  }
   /**
    * Set file replication
    * 
@@ -1137,10 +1165,16 @@ public class FSDirectory implements Closeable {
     iip.setLastINode(targetNode);
 
     // Remove the node from the namespace
-    removeLastINode(iip);
+    final int removed = removeLastINode(iip);
+    if (removed == -1) {
+      return -1;
+    }
 
     // set the parent's modification time
     targetNode.getParent().updateModificationTime(mtime, latestSnapshot);
+    if (removed == 0) {
+      return 0;
+    }
 
     // collect block
     final long inodesRemoved = targetNode.cleanSubtree(null, latestSnapshot,
@@ -1205,6 +1239,7 @@ public class FSDirectory implements Closeable {
     Preconditions.checkState(hasWriteLock());
 
     oldnode.getParent().replaceChild(oldnode, newnode);
+    oldnode.clear();
 
     /* Currently oldnode and newnode are assumed to contain the same
      * blocks. Otherwise, blocks need to be removed from the blocksMap.
@@ -1917,6 +1952,9 @@ public class FSDirectory implements Closeable {
     if (!added) {
       updateCountNoQuotaCheck(iip, pos,
           -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
+    } else {
+      // update parent node
+      iip.setINode(pos - 1, child.getParent());
     }
     return added;
   }
@@ -1933,23 +1971,35 @@ public class FSDirectory implements Closeable {
   /**
    * Remove the last inode in the path from the namespace.
    * Count of each ancestor with quota is also updated.
-   * @return the removed node; null if the removal fails.
+   * @return -1 for failing to remove;
+   *          0 for removing a reference;
+   *          1 for removing a non-reference inode. 
    * @throws NSQuotaExceededException 
    */
-  private INode removeLastINode(final INodesInPath inodesInPath)
+  private int removeLastINode(final INodesInPath iip)
       throws QuotaExceededException {
-    final Snapshot latestSnapshot = inodesInPath.getLatestSnapshot();
-    final INode[] inodes = inodesInPath.getINodes();
-    final int pos = inodes.length - 1;
-    final INodeDirectory parent = inodes[pos-1].asDirectory();
-    final boolean removed = parent.removeChild(inodes[pos], latestSnapshot);
-    if (removed && latestSnapshot == null) {
-      inodesInPath.setINode(pos - 1, inodes[pos].getParent());
-      final Quota.Counts counts = inodes[pos].computeQuotaUsage();
-      updateCountNoQuotaCheck(inodesInPath, pos,
+    final Snapshot latestSnapshot = iip.getLatestSnapshot();
+    final INode last = iip.getLastINode();
+    final INodeDirectory parent = iip.getINode(-2).asDirectory();
+    if (!parent.removeChild(last, latestSnapshot)) {
+      return -1;
+    }
+
+    if (parent != last.getParent()) {
+      // parent is changed
+      iip.setINode(-2, last.getParent());
+    }
+    
+    if (latestSnapshot == null) {
+      final Quota.Counts counts = last.computeQuotaUsage();
+      updateCountNoQuotaCheck(iip, iip.getINodes().length - 1,
           -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
+
+      if (INodeReference.tryRemoveReference(last) > 0) {
+        return 0;
+      }
     }
-    return removed? inodes[pos]: null;
+    return 1;
   }
   
   /**
index b9def47..7dfa1c1 100644 (file)
@@ -314,7 +314,7 @@ public class FSImageFormat {
     }
 
   /** Update the root node's attributes */
-  private void updateRootAttr(INode root) {                                                           
+  private void updateRootAttr(INodeWithAdditionalFields root) {                                                           
     long nsQuota = root.getNsQuota();
     long dsQuota = root.getDsQuota();
     FSDirectory fsDir = namesystem.dir;
@@ -380,7 +380,7 @@ public class FSImageFormat {
       if (in.readShort() != 0) {
         throw new IOException("First node is not root");
       }
-      final INode root = loadINode(null, false, in);
+      final INodeWithAdditionalFields root = loadINode(null, false, in);
       // update the root's attributes
       updateRootAttr(root);
     }
@@ -465,8 +465,8 @@ public class FSImageFormat {
     INodeDirectory parentINode = fsDir.rootDir;
     for (long i = 0; i < numFiles; i++) {
       pathComponents = FSImageSerialization.readPathComponents(in);
-      final INode newNode = loadINode(pathComponents[pathComponents.length-1],
-          false, in);
+      final INodeWithAdditionalFields newNode = loadINode(
+          pathComponents[pathComponents.length-1], false, in);
 
       if (isRoot(pathComponents)) { // it is the root
         // update the root's attributes
@@ -541,7 +541,7 @@ public class FSImageFormat {
    * @param in data input stream from which image is read
    * @return an inode
    */
-  INode loadINode(final byte[] localName, boolean isSnapshotINode,
+  INodeWithAdditionalFields loadINode(final byte[] localName, boolean isSnapshotINode,
       DataInputStream in) throws IOException {
     final int imgVersion = getLayoutVersion();
     final long inodeId = namesystem.allocateNewInodeId();
index 9b69612..33203ee 100644 (file)
@@ -79,8 +79,8 @@ public class FSImageSerialization {
     final FsPermission FILE_PERM = new FsPermission((short) 0);
   }
 
-  private static void writePermissionStatus(INode inode, DataOutput out
-      ) throws IOException {
+  private static void writePermissionStatus(INodeWithAdditionalFields inode,
+      DataOutput out) throws IOException {
     final FsPermission p = TL_DATA.get().FILE_PERM;
     p.fromShort(inode.getFsPermissionShort());
     PermissionStatus.write(out, inode.getUserName(), inode.getGroupName(), p);
index 37be098..c431ff7 100644 (file)
@@ -41,7 +41,6 @@ import org.apache.hadoop.hdfs.util.Diff;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.common.primitives.SignedBytes;
 //import org.apache.hadoop.hdfs.util.EnumCounters;
 
@@ -54,141 +53,47 @@ import com.google.common.primitives.SignedBytes;
 public abstract class INode implements Diff.Element<byte[]> {
   public static final Log LOG = LogFactory.getLog(INode.class);
 
-  private static enum PermissionStatusFormat {
-    MODE(0, 16),
-    GROUP(MODE.OFFSET + MODE.LENGTH, 25),
-    USER(GROUP.OFFSET + GROUP.LENGTH, 23);
+  /** parent is either an {@link INodeDirectory} or an {@link INodeReference}.*/
+  private INode parent = null;
 
-    final int OFFSET;
-    final int LENGTH; //bit length
-    final long MASK;
-
-    PermissionStatusFormat(int offset, int length) {
-      OFFSET = offset;
-      LENGTH = length;
-      MASK = ((-1L) >>> (64 - LENGTH)) << OFFSET;
-    }
-
-    long retrieve(long record) {
-      return (record & MASK) >>> OFFSET;
-    }
-
-    long combine(long bits, long record) {
-      return (record & ~MASK) | (bits << OFFSET);
-    }
-
-    /** Encode the {@link PermissionStatus} to a long. */
-    static long toLong(PermissionStatus ps) {
-      long permission = 0L;
-      final int user = SerialNumberManager.INSTANCE.getUserSerialNumber(
-          ps.getUserName());
-      permission = USER.combine(user, permission);
-      final int group = SerialNumberManager.INSTANCE.getGroupSerialNumber(
-          ps.getGroupName());
-      permission = GROUP.combine(group, permission);
-      final int mode = ps.getPermission().toShort();
-      permission = MODE.combine(mode, permission);
-      return permission;
-    }
-  }
-
-  /**
-   * The inode id
-   */
-  final private long id;
-
-  /**
-   *  The inode name is in java UTF8 encoding; 
-   *  The name in HdfsFileStatus should keep the same encoding as this.
-   *  if this encoding is changed, implicitly getFileInfo and listStatus in
-   *  clientProtocol are changed; The decoding at the client
-   *  side should change accordingly.
-   */
-  private byte[] name = null;
-  /** 
-   * Permission encoded using {@link PermissionStatusFormat}.
-   * Codes other than {@link #clonePermissionStatus(INode)}
-   * and {@link #updatePermissionStatus(PermissionStatusFormat, long)}
-   * should not modify it.
-   */
-  private long permission = 0L;
-  private INodeDirectory parent = null;
-  private long modificationTime = 0L;
-  private long accessTime = 0L;
-
-  private INode(long id, byte[] name, long permission, INodeDirectory parent,
-      long modificationTime, long accessTime) {
-    this.id = id;
-    this.name = name;
-    this.permission = permission;
+  INode(INode parent) {
     this.parent = parent;
-    this.modificationTime = modificationTime;
-    this.accessTime = accessTime;
-  }
-
-  INode(long id, byte[] name, PermissionStatus permissions,
-      long modificationTime, long accessTime) {
-    this(id, name, PermissionStatusFormat.toLong(permissions), null,
-        modificationTime, accessTime);
-  }
-  
-  /** @param other Other node to be copied */
-  INode(INode other) {
-    this(other.id, other.name, other.permission, other.parent, 
-        other.modificationTime, other.accessTime);
   }
 
   /** Get inode id */
-  public long getId() {
-    return this.id;
-  }
+  public abstract long getId();
 
   /**
    * Check whether this is the root inode.
    */
-  boolean isRoot() {
-    return name.length == 0;
+  final boolean isRoot() {
+    return getLocalNameBytes().length == 0;
   }
 
-  /** Clone the {@link PermissionStatus}. */
-  void clonePermissionStatus(INode that) {
-    this.permission = that.permission;
-  }
   /** Get the {@link PermissionStatus} */
-  public final PermissionStatus getPermissionStatus(Snapshot snapshot) {
-    return new PermissionStatus(getUserName(snapshot), getGroupName(snapshot),
-        getFsPermission(snapshot));
-  }
+  abstract PermissionStatus getPermissionStatus(Snapshot snapshot);
+
   /** The same as getPermissionStatus(null). */
-  public final PermissionStatus getPermissionStatus() {
+  final PermissionStatus getPermissionStatus() {
     return getPermissionStatus(null);
   }
-  private void updatePermissionStatus(PermissionStatusFormat f, long n) {
-    this.permission = f.combine(n, permission);
-  }
+
   /**
    * @param snapshot
    *          if it is not null, get the result from the given snapshot;
    *          otherwise, get the result from the current inode.
    * @return user name
    */
-  public final String getUserName(Snapshot snapshot) {
-    if (snapshot != null) {
-      return getSnapshotINode(snapshot).getUserName();
-    }
+  abstract String getUserName(Snapshot snapshot);
 
-    int n = (int)PermissionStatusFormat.USER.retrieve(permission);
-    return SerialNumberManager.INSTANCE.getUser(n);
-  }
   /** The same as getUserName(null). */
   public final String getUserName() {
     return getUserName(null);
   }
+
   /** Set user */
-  final void setUser(String user) {
-    int n = SerialNumberManager.INSTANCE.getUserSerialNumber(user);
-    updatePermissionStatus(PermissionStatusFormat.USER, n);
-  }
+  abstract void setUser(String user);
+
   /** Set user */
   final INode setUser(String user, Snapshot latest)
       throws QuotaExceededException {
@@ -202,23 +107,16 @@ public abstract class INode implements Diff.Element<byte[]> {
    *          otherwise, get the result from the current inode.
    * @return group name
    */
-  public final String getGroupName(Snapshot snapshot) {
-    if (snapshot != null) {
-      return getSnapshotINode(snapshot).getGroupName();
-    }
+  abstract String getGroupName(Snapshot snapshot);
 
-    int n = (int)PermissionStatusFormat.GROUP.retrieve(permission);
-    return SerialNumberManager.INSTANCE.getGroup(n);
-  }
   /** The same as getGroupName(null). */
   public final String getGroupName() {
     return getGroupName(null);
   }
+
   /** Set group */
-  final void setGroup(String group) {
-    int n = SerialNumberManager.INSTANCE.getGroupSerialNumber(group);
-    updatePermissionStatus(PermissionStatusFormat.GROUP, n);
-  }
+  abstract void setGroup(String group);
+
   /** Set group */
   final INode setGroup(String group, Snapshot latest)
       throws QuotaExceededException {
@@ -226,32 +124,23 @@ public abstract class INode implements Diff.Element<byte[]> {
     nodeToUpdate.setGroup(group);
     return nodeToUpdate;
   }
+
   /**
    * @param snapshot
    *          if it is not null, get the result from the given snapshot;
    *          otherwise, get the result from the current inode.
    * @return permission.
    */
-  public final FsPermission getFsPermission(Snapshot snapshot) {
-    if (snapshot != null) {
-      return getSnapshotINode(snapshot).getFsPermission();
-    }
-
-    return new FsPermission(
-        (short)PermissionStatusFormat.MODE.retrieve(permission));
-  }
+  abstract FsPermission getFsPermission(Snapshot snapshot);
+  
   /** The same as getFsPermission(null). */
   public final FsPermission getFsPermission() {
     return getFsPermission(null);
   }
-  protected short getFsPermissionShort() {
-    return (short)PermissionStatusFormat.MODE.retrieve(permission);
-  }
+
   /** Set the {@link FsPermission} of this {@link INode} */
-  void setPermission(FsPermission permission) {
-    final short mode = permission.toShort();
-    updatePermissionStatus(PermissionStatusFormat.MODE, mode);
-  }
+  abstract void setPermission(FsPermission permission);
+
   /** Set the {@link FsPermission} of this {@link INode} */
   INode setPermission(FsPermission permission, Snapshot latest)
       throws QuotaExceededException {
@@ -270,10 +159,24 @@ public abstract class INode implements Diff.Element<byte[]> {
 
   /** Is this inode in the latest snapshot? */
   public final boolean isInLatestSnapshot(final Snapshot latest) {
-    return latest != null
-        && (parent == null
-            || (parent.isInLatestSnapshot(latest)
-                && this == parent.getChild(getLocalNameBytes(), latest)));
+    if (latest == null) {
+      return false;
+    }
+    final INodeDirectory parentDir = getParent();
+    if (parentDir == null) { // root
+      return true;
+    }
+    if (!parentDir.isInLatestSnapshot(latest)) {
+      return false;
+    }
+    final INode child = parentDir.getChild(getLocalNameBytes(), latest);
+    if (this == child) {
+      return true;
+    }
+    if (child == null || !(child.isReference())) {
+      return false;
+    }
+    return this == child.asReference().getReferredINode();
   }
 
   /**
@@ -289,6 +192,17 @@ public abstract class INode implements Diff.Element<byte[]> {
   abstract INode recordModification(final Snapshot latest)
       throws QuotaExceededException;
 
+  /** Check whether it's a reference. */
+  public boolean isReference() {
+    return false;
+  }
+
+  /** Cast this inode to an {@link INodeReference}.  */
+  public INodeReference asReference() {
+    throw new IllegalStateException("Current inode is not a reference: "
+        + this.toDetailString());
+  }
+
   /**
    * Check whether it's a file.
    */
@@ -316,6 +230,19 @@ public abstract class INode implements Diff.Element<byte[]> {
   }
 
   /**
+   * Check whether it's a symlink
+   */
+  public boolean isSymlink() {
+    return false;
+  }
+
+  /** Cast this inode to an {@link INodeSymlink}.  */
+  public INodeSymlink asSymlink() {
+    throw new IllegalStateException("Current inode is not a symlink: "
+        + this.toDetailString());
+  }
+
+  /**
    * Clean the subtree under this inode and collect the blocks from the descents
    * for further block deletion/update. The current inode can either resides in
    * the current tree or be stored as a snapshot copy.
@@ -420,8 +347,9 @@ public abstract class INode implements Diff.Element<byte[]> {
    */
   public void addSpaceConsumed(long nsDelta, long dsDelta)
       throws QuotaExceededException {
-    if (parent != null) {
-      parent.addSpaceConsumed(nsDelta, dsDelta);
+    final INodeDirectory parentDir = getParent();
+    if (parentDir != null) {
+      parentDir.addSpaceConsumed(nsDelta, dsDelta);
     }
   }
 
@@ -460,7 +388,8 @@ public abstract class INode implements Diff.Element<byte[]> {
   /**
    * @return null if the local name is null; otherwise, return the local name.
    */
-  public String getLocalName() {
+  public final String getLocalName() {
+    final byte[] name = getLocalNameBytes();
     return name == null? null: DFSUtil.bytes2String(name);
   }
 
@@ -468,21 +397,17 @@ public abstract class INode implements Diff.Element<byte[]> {
    * @return null if the local name is null;
    *         otherwise, return the local name byte array.
    */
-  public byte[] getLocalNameBytes() {
-    return name;
-  }
+  public abstract byte[] getLocalNameBytes();
 
   @Override
-  public byte[] getKey() {
+  public final byte[] getKey() {
     return getLocalNameBytes();
   }
 
   /**
    * Set local file name
    */
-  public void setLocalName(byte[] name) {
-    this.name = name;
-  }
+  public abstract void setLocalName(byte[] name);
 
   public String getFullPathName() {
     // Get the full path name of this inode.
@@ -492,7 +417,7 @@ public abstract class INode implements Diff.Element<byte[]> {
   /** 
    * @return The full path name represented in a list of byte array
    */
-  public byte[][] getRelativePathNameBytes(INode ancestor) {
+  public final byte[][] getRelativePathNameBytes(INode ancestor) {
     return FSDirectory.getRelativePathNameBytes(this, ancestor);
   }
   
@@ -514,25 +439,37 @@ public abstract class INode implements Diff.Element<byte[]> {
 
   @VisibleForTesting
   public String toDetailString() {
+    final INodeDirectory p = getParent();
     return toStringWithObjectType()
-        + ", parent=" + (parent == null? null: parent.toStringWithObjectType());
+        + ", parent=" + (p == null? null: p.toStringWithObjectType());
+  }
+
+  /** @return the parent directory */
+  public final INodeDirectory getParent() {
+    return parent == null? null
+        : parent.isReference()? getParentReference().getParent(): parent.asDirectory();
   }
 
   /**
-   * Get parent directory 
-   * @return parent INode
+   * @return the parent as a reference if this is a referred inode;
+   *         otherwise, return null.
    */
-  public final INodeDirectory getParent() {
-    return this.parent;
+  public INodeReference getParentReference() {
+    return parent == null || !parent.isReference()? null: (INodeReference)parent;
   }
 
   /** Set parent directory */
-  public void setParent(INodeDirectory parent) {
+  public final void setParent(INodeDirectory parent) {
+    this.parent = parent;
+  }
+
+  /** Set container. */
+  public final void setParentReference(INodeReference parent) {
     this.parent = parent;
   }
 
   /** Clear references to other objects. */
-  public void clearReferences() {
+  public void clear() {
     setParent(null);
   }
 
@@ -542,13 +479,7 @@ public abstract class INode implements Diff.Element<byte[]> {
    *          otherwise, get the result from the current inode.
    * @return modification time.
    */
-  public final long getModificationTime(Snapshot snapshot) {
-    if (snapshot != null) {
-      return getSnapshotINode(snapshot).modificationTime;
-    }
-
-    return this.modificationTime;
-  }
+  abstract long getModificationTime(Snapshot snapshot);
 
   /** The same as getModificationTime(null). */
   public final long getModificationTime() {
@@ -556,23 +487,12 @@ public abstract class INode implements Diff.Element<byte[]> {
   }
 
   /** Update modification time if it is larger than the current value. */
-  public final INode updateModificationTime(long mtime, Snapshot latest)
-      throws QuotaExceededException {
-    Preconditions.checkState(isDirectory());
-    if (mtime <= modificationTime) {
-      return this;
-    }
-    return setModificationTime(mtime, latest);
-  }
-
-  void cloneModificationTime(INode that) {
-    this.modificationTime = that.modificationTime;
-  }
+  public abstract INode updateModificationTime(long mtime, Snapshot latest)
+      throws QuotaExceededException;
 
   /** Set the last modification time of inode. */
-  public final void setModificationTime(long modificationTime) {
-    this.modificationTime = modificationTime;
-  }
+  public abstract void setModificationTime(long modificationTime);
+
   /** Set the last modification time of inode. */
   public final INode setModificationTime(long modificationTime, Snapshot latest)
       throws QuotaExceededException {
@@ -587,13 +507,7 @@ public abstract class INode implements Diff.Element<byte[]> {
    *          otherwise, get the result from the current inode.
    * @return access time
    */
-  public final long getAccessTime(Snapshot snapshot) {
-    if (snapshot != null) {
-      return getSnapshotINode(snapshot).accessTime;
-    }
-
-    return accessTime;
-  }
+  abstract long getAccessTime(Snapshot snapshot);
 
   /** The same as getAccessTime(null). */
   public final long getAccessTime() {
@@ -603,31 +517,18 @@ public abstract class INode implements Diff.Element<byte[]> {
   /**
    * Set last access time of inode.
    */
-  public void setAccessTime(long accessTime) {
-    this.accessTime = accessTime;
-  }
+  public abstract void setAccessTime(long accessTime);
+
   /**
    * Set last access time of inode.
    */
-  public INode setAccessTime(long accessTime, Snapshot latest)
+  public final INode setAccessTime(long accessTime, Snapshot latest)
       throws QuotaExceededException {
     final INode nodeToUpdate = recordModification(latest);
     nodeToUpdate.setAccessTime(accessTime);
     return nodeToUpdate;
   }
 
-  /**
-   * Check whether it's a symlink
-   */
-  public boolean isSymlink() {
-    return false;
-  }
-
-  /** Cast this inode to an {@link INodeSymlink}.  */
-  public INodeSymlink asSymlink() {
-    throw new IllegalStateException("Current inode is not a symlink: "
-        + this.toDetailString());
-  }
 
   /**
    * Breaks file path into components.
@@ -683,6 +584,7 @@ public abstract class INode implements Diff.Element<byte[]> {
 
   @Override
   public final int compareTo(byte[] bytes) {
+    final byte[] name = getLocalNameBytes();
     final byte[] left = name == null? DFSUtil.EMPTY_BYTES: name;
     final byte[] right = bytes == null? DFSUtil.EMPTY_BYTES: bytes;
     return SignedBytes.lexicographicalComparator().compare(left, right);
@@ -696,12 +598,13 @@ public abstract class INode implements Diff.Element<byte[]> {
     if (that == null || !(that instanceof INode)) {
       return false;
     }
-    return Arrays.equals(this.name, ((INode)that).name);
+    return Arrays.equals(this.getLocalNameBytes(),
+        ((INode)that).getLocalNameBytes());
   }
 
   @Override
   public final int hashCode() {
-    return Arrays.hashCode(this.name);
+    return Arrays.hashCode(getLocalNameBytes());
   }
   
   /**
@@ -728,7 +631,9 @@ public abstract class INode implements Diff.Element<byte[]> {
     out.print("   (");
     out.print(getObjectString());
     out.print("), parent=");
-    out.print(parent == null? null: parent.getLocalName() + "/");
+
+    final INodeDirectory p = getParent();
+    out.print(p == null? null: p.getLocalName() + "/");
     out.print(", " + getPermissionStatus(snapshot));
   }
   
index 2123dab..238f35f 100644 (file)
@@ -46,7 +46,7 @@ import com.google.common.base.Preconditions;
 /**
  * Directory INode class.
  */
-public class INodeDirectory extends INode {
+public class INodeDirectory extends INodeWithAdditionalFields {
   /** Cast INode to INodeDirectory. */
   public static INodeDirectory valueOf(INode inode, Object path
       ) throws FileNotFoundException, PathIsNotDirectoryException {
@@ -108,17 +108,6 @@ public class INodeDirectory extends INode {
     return children == null? -1: Collections.binarySearch(children, name);
   }
 
-  private int searchChildrenForExistingINode(final INode inode) {
-    Preconditions.checkNotNull(children);
-    final byte[] name = inode.getLocalNameBytes();
-    final int i = searchChildren(name);
-    if (i < 0) {
-      throw new AssertionError("Child not found: name="
-          + DFSUtil.bytes2String(name));
-    }
-    return i;
-  }
-
   /**
    * Remove the specified child from this directory.
    * 
@@ -144,7 +133,6 @@ public class INodeDirectory extends INode {
    * @return true if the child is removed; false if the child is not found.
    */
   protected final boolean removeChild(final INode child) {
-    Preconditions.checkNotNull(children);
     final int i = searchChildren(child.getLocalNameBytes());
     if (i < 0) {
       return false;
@@ -208,22 +196,47 @@ public class INodeDirectory extends INode {
 
   /** Replace itself with the given directory. */
   private final <N extends INodeDirectory> N replaceSelf(final N newDir) {
-    final INodeDirectory parent = getParent();
-    Preconditions.checkArgument(parent != null, "parent is null, this=%s", this);
-    parent.replaceChild(this, newDir);
+    final INodeReference ref = getParentReference();
+    if (ref != null) {
+      ref.setReferredINode(newDir);
+    } else {
+      final INodeDirectory parent = getParent();
+      Preconditions.checkArgument(parent != null, "parent is null, this=%s", this);
+      parent.replaceChild(this, newDir);
+    }
+    clear();
     return newDir;
   }
 
+  /** Replace the given child with a new child. */
   public void replaceChild(final INode oldChild, final INode newChild) {
     Preconditions.checkNotNull(children);
-    final int i = searchChildrenForExistingINode(newChild);
-    final INode removed = children.set(i, newChild);
-    Preconditions.checkState(removed == oldChild);
-    oldChild.clearReferences();
+    final int i = searchChildren(newChild.getLocalNameBytes());
+    Preconditions.checkState(i >= 0);
+    Preconditions.checkState(oldChild == children.get(i));
+    
+    if (oldChild.isReference()) {
+      final INode withCount = oldChild.asReference().getReferredINode();
+      withCount.asReference().setReferredINode(newChild);
+    } else {
+      final INode removed = children.set(i, newChild);
+      Preconditions.checkState(removed == oldChild);
+    }
+  }
+
+  INodeReference.WithCount replaceChild4Reference(INode oldChild) {
+    Preconditions.checkArgument(!oldChild.isReference());
+    final INodeReference.WithCount withCount
+        = new INodeReference.WithCount(oldChild);
+    final INodeReference ref = new INodeReference(withCount);
+    withCount.incrementReferenceCount();
+    replaceChild(oldChild, ref);
+    return withCount;
   }
 
   private void replaceChildFile(final INodeFile oldChild, final INodeFile newChild) {
     replaceChild(oldChild, newChild);
+    oldChild.clear();
     newChild.updateBlockCollection();
   }
 
@@ -592,8 +605,8 @@ public class INodeDirectory extends INode {
   }
 
   @Override
-  public void clearReferences() {
-    super.clearReferences();
+  public void clear() {
+    super.clear();
     clearChildren();
   }
 
@@ -625,7 +638,7 @@ public class INodeDirectory extends INode {
     for (INode child : getChildrenList(null)) {
       child.destroyAndCollectBlocks(collectedBlocks);
     }
-    clearReferences();
+    clear();
   }
   
   @Override
@@ -784,7 +797,7 @@ public class INodeDirectory extends INode {
     }
     
     void setINode(int i, INode inode) {
-      inodes[i] = inode;
+      inodes[i >= 0? i: inodes.length + i] = inode;
     }
     
     void setLastINode(INode last) {
index dc337f3..7fb0601 100644 (file)
@@ -42,7 +42,7 @@ import com.google.common.base.Preconditions;
 
 /** I-node for closed file. */
 @InterfaceAudience.Private
-public class INodeFile extends INode implements BlockCollection {
+public class INodeFile extends INodeWithAdditionalFields implements BlockCollection {
   /** The same as valueOf(inode, path, false). */
   public static INodeFile valueOf(INode inode, String path
       ) throws FileNotFoundException {
@@ -309,7 +309,7 @@ public class INodeFile extends INode implements BlockCollection {
       }
     }
     setBlocks(null);
-    clearReferences();
+    clear();
     
     if (this instanceof FileWithSnapshot) {
       ((FileWithSnapshot) this).getDiffs().clear();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
new file mode 100644 (file)
index 0000000..cc4cebe
--- /dev/null
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+
+/**
+ * An anonymous reference to an inode.
+ *
+ * This class and its subclasses are used to support multiple access paths.
+ * A file/directory may have multiple access paths when it is stored in some
+ * snapshots and it is renamed/moved to other locations.
+ * 
+ * For example,
+ * (1) Support we have /abc/foo, say the inode of foo is inode(id=1000,name=foo)
+ * (2) create snapshot s0 for /abc
+ * (3) mv /abc/foo /xyz/bar, i.e. inode(id=1000,name=...) is renamed from "foo"
+ *     to "bar" and its parent becomes /xyz.
+ * 
+ * Then, /xyz/bar and /abc/.snapshot/s0/foo are two different access paths to
+ * the same inode, inode(id=1000,name=bar).
+ *
+ * With references, we have the following
+ * - /abc has a child ref(id=1001,name=foo).
+ * - /xyz has a child ref(id=1002) 
+ * - Both ref(id=1001,name=foo) and ref(id=1002) point to another reference,
+ *   ref(id=1003,count=2).
+ * - Finally, ref(id=1003,count=2) points to inode(id=1000,name=bar).
+ * 
+ * Note 1: For a reference without name, e.g. ref(id=1002), it uses the name
+ *         of the referred inode.
+ * Note 2: getParent() always returns the parent in the current state, e.g.
+ *         inode(id=1000,name=bar).getParent() returns /xyz but not /abc.
+ */
+public class INodeReference extends INode {
+  /**
+   * Try to remove the given reference and then return the reference count.
+   * If the given inode is not a reference, return -1;
+   */
+  public static int tryRemoveReference(INode inode) {
+    if (!inode.isReference()) {
+      return -1;
+    }
+    return removeReference(inode.asReference());
+  }
+
+  /**
+   * Remove the given reference and then return the reference count.
+   * If the referred inode is not a WithCount, return -1;
+   */
+  private static int removeReference(INodeReference ref) {
+    final INode referred = ref.getReferredINode();
+    if (!(referred instanceof WithCount)) {
+      return -1;
+    }
+    return ((WithCount)referred).decrementReferenceCount();
+  }
+
+  private INode referred;
+  
+  INodeReference(INode referred) {
+    super(referred.getParent());
+    this.referred = referred;
+
+    referred.setParentReference(this);
+  }
+
+
+  public final INode getReferredINode() {
+    return referred;
+  }
+
+  public final void setReferredINode(INode referred) {
+    this.referred = referred;
+  }
+  
+  @Override
+  public final boolean isReference() {
+    return true;
+  }
+  
+  @Override
+  public final INodeReference asReference() {
+    return this;
+  }
+
+  @Override
+  public final boolean isFile() {
+    return referred.isFile();
+  }
+  
+  @Override
+  public final INodeFile asFile() {
+    return referred.asFile();
+  }
+  
+  @Override
+  public final boolean isDirectory() {
+    return referred.isDirectory();
+  }
+  
+  @Override
+  public final INodeDirectory asDirectory() {
+    return referred.asDirectory();
+  }
+  
+  @Override
+  public final boolean isSymlink() {
+    return referred.isSymlink();
+  }
+  
+  @Override
+  public final INodeSymlink asSymlink() {
+    return referred.asSymlink();
+  }
+
+  @Override
+  public byte[] getLocalNameBytes() {
+    return referred.getLocalNameBytes();
+  }
+
+  @Override
+  public void setLocalName(byte[] name) {
+    referred.setLocalName(name);
+  }
+
+  @Override
+  public final long getId() {
+    return referred.getId();
+  }
+  
+  @Override
+  public final PermissionStatus getPermissionStatus(Snapshot snapshot) {
+    return referred.getPermissionStatus(snapshot);
+  }
+  
+  @Override
+  public final String getUserName(Snapshot snapshot) {
+    return referred.getUserName(snapshot);
+  }
+  
+  @Override
+  final void setUser(String user) {
+    referred.setUser(user);
+  }
+  
+  @Override
+  public final String getGroupName(Snapshot snapshot) {
+    return referred.getGroupName(snapshot);
+  }
+  
+  @Override
+  final void setGroup(String group) {
+    referred.setGroup(group);
+  }
+  
+  @Override
+  public final FsPermission getFsPermission(Snapshot snapshot) {
+    return referred.getFsPermission(snapshot);
+  }
+  
+  @Override
+  void setPermission(FsPermission permission) {
+    referred.setPermission(permission);
+  }
+  
+  @Override
+  public final long getModificationTime(Snapshot snapshot) {
+    return referred.getModificationTime(snapshot);
+  }
+  
+  @Override
+  public final INode updateModificationTime(long mtime, Snapshot latest)
+      throws QuotaExceededException {
+    return referred.updateModificationTime(mtime, latest);
+  }
+  
+  @Override
+  public final void setModificationTime(long modificationTime) {
+    referred.setModificationTime(modificationTime);
+  }
+  
+  @Override
+  public final long getAccessTime(Snapshot snapshot) {
+    return referred.getAccessTime(snapshot);
+  }
+  
+  @Override
+  public final void setAccessTime(long accessTime) {
+    referred.setAccessTime(accessTime);
+  }
+
+  @Override
+  final INode recordModification(Snapshot latest) throws QuotaExceededException {
+    return referred.recordModification(latest);
+  }
+
+  @Override
+  public final Quota.Counts cleanSubtree(Snapshot snapshot, Snapshot prior,
+      BlocksMapUpdateInfo collectedBlocks) throws QuotaExceededException {
+    return referred.cleanSubtree(snapshot, prior, collectedBlocks);
+  }
+
+  @Override
+  public final void destroyAndCollectBlocks(BlocksMapUpdateInfo collectedBlocks) {
+    if (removeReference(this) <= 0) {
+      referred.destroyAndCollectBlocks(collectedBlocks);
+    }
+  }
+
+  @Override
+  public final Content.CountsMap computeContentSummary(Content.CountsMap countsMap) {
+    return referred.computeContentSummary(countsMap);
+  }
+
+  @Override
+  public final Content.Counts computeContentSummary(Content.Counts counts) {
+    return referred.computeContentSummary(counts);
+  }
+
+  @Override
+  public final Quota.Counts computeQuotaUsage(Quota.Counts counts, boolean useCache) {
+    return referred.computeQuotaUsage(counts, useCache);
+  }
+  
+  @Override
+  public final INode getSnapshotINode(Snapshot snapshot) {
+    return referred.getSnapshotINode(snapshot);
+  }
+
+  @Override
+  public final void addSpaceConsumed(long nsDelta, long dsDelta
+      ) throws QuotaExceededException {
+    referred.addSpaceConsumed(nsDelta, dsDelta);
+  }
+
+  @Override
+  public final long getNsQuota() {
+    return referred.getNsQuota();
+  }
+
+  @Override
+  public final long getDsQuota() {
+    return referred.getDsQuota();
+  }
+  
+  @Override
+  public final void clear() {
+    super.clear();
+    referred.clear();
+    referred = null;
+  }
+
+  /** An anonymous reference with reference count. */
+  public static class WithCount extends INodeReference {
+    private int referenceCount = 0;
+    
+    WithCount(INode inode) {
+      super(inode);
+    }
+    
+    public int incrementReferenceCount() {
+      return ++referenceCount;
+    }
+
+    public int decrementReferenceCount() {
+      return --referenceCount;
+    }
+  }
+
+  /** A reference with a fixed name. */
+  public static class WithName extends INodeReference {
+
+    private final byte[] name;
+
+    public WithName(WithCount referred, byte[] name) {
+      super(referred);
+      this.name = name;
+    }
+
+    @Override
+    public final byte[] getLocalNameBytes() {
+      return name;
+    }
+
+    @Override
+    public final void setLocalName(byte[] name) {
+      throw new UnsupportedOperationException("Cannot set name: " + getClass()
+          + " is immutable.");
+    }
+  }
+}
\ No newline at end of file
index a171342..f0f1aac 100644 (file)
@@ -30,7 +30,7 @@ import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
  * An {@link INode} representing a symbolic link.
  */
 @InterfaceAudience.Private
-public class INodeSymlink extends INode {
+public class INodeSymlink extends INodeWithAdditionalFields {
   private final byte[] symlink; // The target URI
 
   INodeSymlink(long id, byte[] name, PermissionStatus permissions,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeWithAdditionalFields.java
new file mode 100644 (file)
index 0000000..6327e98
--- /dev/null
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * {@link INode} with additional fields including id, name, permission,
+ * access time and modification time.
+ */
+@InterfaceAudience.Private
+public abstract class INodeWithAdditionalFields extends INode {
+  private static enum PermissionStatusFormat {
+    MODE(0, 16),
+    GROUP(MODE.OFFSET + MODE.LENGTH, 25),
+    USER(GROUP.OFFSET + GROUP.LENGTH, 23);
+
+    final int OFFSET;
+    final int LENGTH; //bit length
+    final long MASK;
+
+    PermissionStatusFormat(int offset, int length) {
+      OFFSET = offset;
+      LENGTH = length;
+      MASK = ((-1L) >>> (64 - LENGTH)) << OFFSET;
+    }
+
+    long retrieve(long record) {
+      return (record & MASK) >>> OFFSET;
+    }
+
+    long combine(long bits, long record) {
+      return (record & ~MASK) | (bits << OFFSET);
+    }
+
+    /** Encode the {@link PermissionStatus} to a long. */
+    static long toLong(PermissionStatus ps) {
+      long permission = 0L;
+      final int user = SerialNumberManager.INSTANCE.getUserSerialNumber(
+          ps.getUserName());
+      permission = USER.combine(user, permission);
+      final int group = SerialNumberManager.INSTANCE.getGroupSerialNumber(
+          ps.getGroupName());
+      permission = GROUP.combine(group, permission);
+      final int mode = ps.getPermission().toShort();
+      permission = MODE.combine(mode, permission);
+      return permission;
+    }
+  }
+
+  /** The inode id. */
+  final private long id;
+  /**
+   *  The inode name is in java UTF8 encoding; 
+   *  The name in HdfsFileStatus should keep the same encoding as this.
+   *  if this encoding is changed, implicitly getFileInfo and listStatus in
+   *  clientProtocol are changed; The decoding at the client
+   *  side should change accordingly.
+   */
+  private byte[] name = null;
+  /** 
+   * Permission encoded using {@link PermissionStatusFormat}.
+   * Codes other than {@link #clonePermissionStatus(INodeWithAdditionalFields)}
+   * and {@link #updatePermissionStatus(PermissionStatusFormat, long)}
+   * should not modify it.
+   */
+  private long permission = 0L;
+  /** The last modification time*/
+  private long modificationTime = 0L;
+  /** The last access time*/
+  private long accessTime = 0L;
+
+  private INodeWithAdditionalFields(INode parent, long id, byte[] name,
+      long permission, long modificationTime, long accessTime) {
+    super(parent);
+    this.id = id;
+    this.name = name;
+    this.permission = permission;
+    this.modificationTime = modificationTime;
+    this.accessTime = accessTime;
+  }
+
+  INodeWithAdditionalFields(long id, byte[] name, PermissionStatus permissions,
+      long modificationTime, long accessTime) {
+    this(null, id, name, PermissionStatusFormat.toLong(permissions),
+        modificationTime, accessTime);
+  }
+  
+  /** @param other Other node to be copied */
+  INodeWithAdditionalFields(INodeWithAdditionalFields other) {
+    this(other.getParent(), other.getId(), other.getLocalNameBytes(),
+        other.permission, other.modificationTime, other.accessTime);
+  }
+
+  /** Get inode id */
+  public final long getId() {
+    return this.id;
+  }
+
+  @Override
+  public final byte[] getLocalNameBytes() {
+    return name;
+  }
+  
+  @Override
+  public final void setLocalName(byte[] name) {
+    this.name = name;
+  }
+
+  /** Clone the {@link PermissionStatus}. */
+  final void clonePermissionStatus(INodeWithAdditionalFields that) {
+    this.permission = that.permission;
+  }
+
+  @Override
+  final PermissionStatus getPermissionStatus(Snapshot snapshot) {
+    return new PermissionStatus(getUserName(snapshot), getGroupName(snapshot),
+        getFsPermission(snapshot));
+  }
+
+  private final void updatePermissionStatus(PermissionStatusFormat f, long n) {
+    this.permission = f.combine(n, permission);
+  }
+
+  @Override
+  final String getUserName(Snapshot snapshot) {
+    if (snapshot != null) {
+      return getSnapshotINode(snapshot).getUserName();
+    }
+
+    int n = (int)PermissionStatusFormat.USER.retrieve(permission);
+    return SerialNumberManager.INSTANCE.getUser(n);
+  }
+
+  @Override
+  final void setUser(String user) {
+    int n = SerialNumberManager.INSTANCE.getUserSerialNumber(user);
+    updatePermissionStatus(PermissionStatusFormat.USER, n);
+  }
+
+  @Override
+  final String getGroupName(Snapshot snapshot) {
+    if (snapshot != null) {
+      return getSnapshotINode(snapshot).getGroupName();
+    }
+
+    int n = (int)PermissionStatusFormat.GROUP.retrieve(permission);
+    return SerialNumberManager.INSTANCE.getGroup(n);
+  }
+
+  @Override
+  final void setGroup(String group) {
+    int n = SerialNumberManager.INSTANCE.getGroupSerialNumber(group);
+    updatePermissionStatus(PermissionStatusFormat.GROUP, n);
+  }
+
+  @Override
+  final FsPermission getFsPermission(Snapshot snapshot) {
+    if (snapshot != null) {
+      return getSnapshotINode(snapshot).getFsPermission();
+    }
+
+    return new FsPermission(
+        (short)PermissionStatusFormat.MODE.retrieve(permission));
+  }
+
+  final short getFsPermissionShort() {
+    return (short)PermissionStatusFormat.MODE.retrieve(permission);
+  }
+  @Override
+  void setPermission(FsPermission permission) {
+    final short mode = permission.toShort();
+    updatePermissionStatus(PermissionStatusFormat.MODE, mode);
+  }
+
+  @Override
+  final long getModificationTime(Snapshot snapshot) {
+    if (snapshot != null) {
+      return getSnapshotINode(snapshot).getModificationTime(null);
+    }
+
+    return this.modificationTime;
+  }
+
+
+  /** Update modification time if it is larger than the current value. */
+  public final INode updateModificationTime(long mtime, Snapshot latest)
+      throws QuotaExceededException {
+    Preconditions.checkState(isDirectory());
+    if (mtime <= modificationTime) {
+      return this;
+    }
+    return setModificationTime(mtime, latest);
+  }
+
+  final void cloneModificationTime(INodeWithAdditionalFields that) {
+    this.modificationTime = that.modificationTime;
+  }
+
+  @Override
+  public final void setModificationTime(long modificationTime) {
+    this.modificationTime = modificationTime;
+  }
+
+  @Override
+  final long getAccessTime(Snapshot snapshot) {
+    if (snapshot != null) {
+      return getSnapshotINode(snapshot).getAccessTime(null);
+    }
+
+    return accessTime;
+  }
+
+  /**
+   * Set last access time of inode.
+   */
+  public final void setAccessTime(long accessTime) {
+    this.accessTime = accessTime;
+  }
+}
index 0054fde..37c7b33 100644 (file)
@@ -94,7 +94,7 @@ abstract class AbstractINodeDiffList<N extends INode,
         if (previous.snapshotINode == null) {
           previous.snapshotINode = removed.snapshotINode;
         } else if (removed.snapshotINode != null) {
-          removed.snapshotINode.clearReferences();
+          removed.snapshotINode.clear();
         }
         counts.add(previous.combinePosteriorAndCollectBlocks(
             currentINode, removed, collectedBlocks));
index 798047a..014eecf 100644 (file)
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.Quota;
+import org.apache.hadoop.hdfs.util.Diff.ListType;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.util.Time;
 
@@ -418,8 +419,9 @@ public class INodeDirectorySnapshottable extends INodeDirectoryWithSnapshot {
       ReadOnlyList<INode> children = dir.getChildrenList(diffReport
           .isFromEarlier() ? diffReport.to : diffReport.from);
       for (INode child : children) {
-        if (diff.searchCreated(child.getLocalNameBytes()) == null
-            && diff.searchDeleted(child.getLocalNameBytes()) == null) {
+        final byte[] name = child.getLocalNameBytes();
+        if (diff.searchIndex(ListType.CREATED, name) < 0
+            && diff.searchIndex(ListType.DELETED, name) < 0) {
           computeDiffRecursively(child, diffReport);
         }
       }
index 5ff074d..0bb637f 100644 (file)
@@ -34,9 +34,11 @@ import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryWithQuota;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference;
 import org.apache.hadoop.hdfs.server.namenode.Quota;
 import org.apache.hadoop.hdfs.util.Diff;
 import org.apache.hadoop.hdfs.util.Diff.Container;
+import org.apache.hadoop.hdfs.util.Diff.ListType;
 import org.apache.hadoop.hdfs.util.Diff.UndoInfo;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
@@ -59,12 +61,25 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
       super(created, deleted);
     }
 
-    private final INode setCreatedChild(final int c, final INode newChild) {
-      return getCreatedList().set(c, newChild);
+    /**
+     * Replace the given child from the created/deleted list.
+     * @return true if the child is replaced; false if the child is not found.
+     */
+    private final boolean replace(final ListType type,
+        final INode oldChild, final INode newChild) {
+      final List<INode> list = getList(type); 
+      final int i = search(list, oldChild.getLocalNameBytes());
+      if (i < 0) {
+        return false;
+      }
+
+      final INode removed = list.set(i, newChild);
+      Preconditions.checkState(removed == oldChild);
+      return true;
     }
 
     private final boolean removeCreatedChild(final int c, final INode child) {
-      final List<INode> created = getCreatedList();
+      final List<INode> created = getList(ListType.CREATED);
       if (created.get(c) == child) {
         final INode removed = created.remove(c);
         Preconditions.checkState(removed == child);
@@ -77,7 +92,7 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
     private void destroyCreatedList(
         final INodeDirectoryWithSnapshot currentINode,
         final BlocksMapUpdateInfo collectedBlocks) {
-      List<INode> createdList = getCreatedList();
+      final List<INode> createdList = getList(ListType.CREATED);
       for (INode c : createdList) {
         c.destroyAndCollectBlocks(collectedBlocks);
         // c should be contained in the children list, remove it
@@ -90,10 +105,12 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
     private Quota.Counts destroyDeletedList(
         final BlocksMapUpdateInfo collectedBlocks) {
       Quota.Counts counts = Quota.Counts.newInstance();
-      List<INode> deletedList = getDeletedList();
+      final List<INode> deletedList = getList(ListType.DELETED);
       for (INode d : deletedList) {
-        d.computeQuotaUsage(counts, false);
-        d.destroyAndCollectBlocks(collectedBlocks);
+        if (INodeReference.tryRemoveReference(d) <= 0) {
+          d.computeQuotaUsage(counts, false);
+          d.destroyAndCollectBlocks(collectedBlocks);
+        }
       }
       deletedList.clear();
       return counts;
@@ -101,7 +118,7 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
     
     /** Serialize {@link #created} */
     private void writeCreated(DataOutputStream out) throws IOException {
-      final List<INode> created = getCreatedList();
+      final List<INode> created = getList(ListType.CREATED);
       out.writeInt(created.size());
       for (INode node : created) {
         // For INode in created list, we only need to record its local name 
@@ -113,7 +130,7 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
     
     /** Serialize {@link #deleted} */
     private void writeDeleted(DataOutputStream out) throws IOException {
-      final List<INode> deleted = getDeletedList();
+      final List<INode> deleted = getList(ListType.DELETED);
       out.writeInt(deleted.size());
       for (INode node : deleted) {
         FSImageSerialization.saveINode2Image(node, out, true);
@@ -129,7 +146,7 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
     /** @return The list of INodeDirectory contained in the deleted list */
     private List<INodeDirectory> getDirsInDeleted() {
       List<INodeDirectory> dirList = new ArrayList<INodeDirectory>();
-      for (INode node : getDeletedList()) {
+      for (INode node : getList(ListType.DELETED)) {
         if (node.isDirectory()) {
           dirList.add(node.asDirectory());
         }
@@ -151,8 +168,8 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
       List<DiffReportEntry> cList = new ArrayList<DiffReportEntry>();
       List<DiffReportEntry> dList = new ArrayList<DiffReportEntry>();
       int c = 0, d = 0;
-      List<INode> created = getCreatedList();
-      List<INode> deleted = getDeletedList();
+      List<INode> created = getList(ListType.CREATED);
+      List<INode> deleted = getList(ListType.DELETED);
       byte[][] parentPath = parent.getRelativePathNameBytes(root);
       byte[][] fullPath = new byte[parentPath.length + 1][];
       System.arraycopy(parentPath, 0, fullPath, 0, parentPath.length);
@@ -243,8 +260,10 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
         @Override
         public void process(INode inode) {
           if (inode != null) {
-            inode.computeQuotaUsage(counts, false);
-            inode.destroyAndCollectBlocks(collectedBlocks);
+            if (INodeReference.tryRemoveReference(inode) <= 0) {
+              inode.computeQuotaUsage(counts, false);
+              inode.destroyAndCollectBlocks(collectedBlocks);
+            }
           }
         }
       });
@@ -373,6 +392,19 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
     DirectoryDiffList() {
       setFactory(DirectoryDiffFactory.INSTANCE);
     }
+    
+    /** Replace the given child in the created/deleted list, if there is any. */
+    private boolean replaceChild(final ListType type, final INode oldChild,
+        final INode newChild) {
+      final List<DirectoryDiff> diffList = asList();
+      for(int i = diffList.size() - 1; i >= 0; i--) {
+        final ChildrenDiff diff = diffList.get(i).diff;
+        if (diff.replace(type, oldChild, newChild)) {
+          return true;
+        }
+      }
+      return false;
+    }
   }
 
   /**
@@ -550,7 +582,7 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
     final List<DirectoryDiff> diffList = diffs.asList();
     for(int i = diffList.size() - 1; i >= 0; i--) {
       final ChildrenDiff diff = diffList.get(i).diff;
-      final int c = diff.searchCreatedIndex(name);
+      final int c = diff.searchIndex(ListType.CREATED, name);
       if (c >= 0) {
         if (diff.removeCreatedChild(c, child)) {
           return true;
@@ -563,19 +595,20 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
   @Override
   public void replaceChild(final INode oldChild, final INode newChild) {
     super.replaceChild(oldChild, newChild);
+    diffs.replaceChild(ListType.CREATED, oldChild, newChild);
+  }
 
-    // replace same child in the created list, if there is any.
-    final byte[] name = oldChild.getLocalNameBytes();
-    final List<DirectoryDiff> diffList = diffs.asList();
-    for(int i = diffList.size() - 1; i >= 0; i--) {
-      final ChildrenDiff diff = diffList.get(i).diff;
-      final int c = diff.searchCreatedIndex(name);
-      if (c >= 0) {
-        final INode removed = diff.setCreatedChild(c, newChild);
-        Preconditions.checkState(removed == oldChild);
-        return;
-      }
-    }
+  /** The child just has been removed, replace it with a reference. */
+  public INodeReference.WithName replaceRemovedChild4Reference(
+      INode oldChild, INodeReference.WithCount newChild, byte[] childName) {
+    final INodeReference.WithName ref = new INodeReference.WithName(
+        newChild, childName);
+    newChild.incrementReferenceCount();
+    diffs.replaceChild(ListType.CREATED, oldChild, ref);
+    // the old child must be in the deleted list
+    Preconditions.checkState(
+        diffs.replaceChild(ListType.DELETED, oldChild, ref));
+    return ref;
   }
 
   @Override
@@ -640,7 +673,7 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
       if (prior != null) {
         DirectoryDiff priorDiff = this.getDiffs().getDiff(prior);
         if (priorDiff != null) {
-          for (INode cNode : priorDiff.getChildrenDiff().getCreatedList()) {
+          for (INode cNode : priorDiff.getChildrenDiff().getList(ListType.CREATED)) {
             counts.add(cNode.cleanSubtree(snapshot, null, collectedBlocks));
           }
         }
@@ -671,7 +704,7 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
     super.computeQuotaUsage4CurrentDirectory(counts);
 
     for(DirectoryDiff d : diffs) {
-      for(INode deleted : d.getChildrenDiff().getDeletedList()) {
+      for(INode deleted : d.getChildrenDiff().getList(ListType.DELETED)) {
         deleted.computeQuotaUsage(counts, false);
       }
     }
@@ -696,7 +729,7 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
 
   private void computeContentSummary4Snapshot(final Content.Counts counts) {
     for(DirectoryDiff d : diffs) {
-      for(INode deleted : d.getChildrenDiff().getDeletedList()) {
+      for(INode deleted : d.getChildrenDiff().getList(ListType.DELETED)) {
         deleted.computeContentSummary(counts);
       }
     }
index 12d6f23..2db7b65 100644 (file)
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiff
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot.DirectoryDiff;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot.DirectoryDiffList;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.Root;
+import org.apache.hadoop.hdfs.util.Diff.ListType;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
 /**
@@ -141,7 +142,8 @@ public class SnapshotFSImageFormat {
     // the INode in the created list should be a reference to another INode
     // in posterior SnapshotDiffs or one of the current children
     for (DirectoryDiff postDiff : parent.getDiffs()) {
-      INode d = postDiff.getChildrenDiff().searchDeleted(createdNodeName);
+      final INode d = postDiff.getChildrenDiff().search(ListType.DELETED,
+          createdNodeName);
       if (d != null) {
         return d;
       } // else go to the next SnapshotDiff
index a602733..b8ec5ca 100644 (file)
@@ -73,6 +73,10 @@ import com.google.common.base.Preconditions;
  * @param <E> The element type, which must implement {@link Element} interface.
  */
 public class Diff<K, E extends Diff.Element<K>> {
+  public static enum ListType {
+    CREATED, DELETED
+  }
+
   /** An interface for the elements in a {@link Diff}. */
   public static interface Element<K> extends Comparable<K> {
     /** @return the key of this object. */
@@ -153,35 +157,23 @@ public class Diff<K, E extends Diff.Element<K>> {
   }
 
   /** @return the created list, which is never null. */
-  public List<E> getCreatedList() {
-    return created == null? Collections.<E>emptyList(): created;
+  public List<E> getList(final ListType type) {
+    final List<E> list = type == ListType.CREATED? created: deleted;
+    return list == null? Collections.<E>emptyList(): list;
   }
 
-  /** @return the deleted list, which is never null. */
-  public List<E> getDeletedList() {
-    return deleted == null? Collections.<E>emptyList(): deleted;
+  public int searchIndex(final ListType type, final K name) {
+    return search(getList(type), name);
   }
 
-  public int searchCreatedIndex(final K name) {
-    return search(created, name);
-  }
-
-  /**
-   * @return null if the element is not found;
-   *         otherwise, return the element in the c-list.
-   */
-  public E searchCreated(final K name) {
-    final int c = searchCreatedIndex(name);
-    return c < 0 ? null : created.get(c);
-  }
-  
   /**
    * @return null if the element is not found;
-   *         otherwise, return the element in the d-list.
+   *         otherwise, return the element in the created/deleted list.
    */
-  public E searchDeleted(final K name) {
-    final int d = search(deleted, name);
-    return d < 0 ? null : deleted.get(d);
+  public E search(final ListType type, final K name) {
+    final List<E> list = getList(type); 
+    final int c = search(list, name);
+    return c < 0 ? null : list.get(c);
   }
   
   /** @return true if no changes contained in the diff */
@@ -191,35 +183,25 @@ public class Diff<K, E extends Diff.Element<K>> {
   }
   
   /**
-   * Insert the element to created.
-   * @param i the insertion point defined
-   *          in {@link Collections#binarySearch(List, Object)}
-   */
-  private void insertCreated(final E element, final int i) {
-    if (i >= 0) {
-      throw new AssertionError("Element already exists: element=" + element
-          + ", created=" + created);
-    }
-    if (created == null) {
-      created = new ArrayList<E>(DEFAULT_ARRAY_INITIAL_CAPACITY);
-    }
-    created.add(-i - 1, element);
-  }
-
-  /**
-   * Insert the element to deleted.
+   * Insert the given element to the created/deleted list.
    * @param i the insertion point defined
    *          in {@link Collections#binarySearch(List, Object)}
    */
-  private void insertDeleted(final E element, final int i) {
+  private void insert(final ListType type, final E element, final int i) {
+    List<E> list = type == ListType.CREATED? created: deleted; 
     if (i >= 0) {
       throw new AssertionError("Element already exists: element=" + element
-          + ", deleted=" + deleted);
+          + ", " + type + "=" + list);
     }
-    if (deleted == null) {
-      deleted = new ArrayList<E>(DEFAULT_ARRAY_INITIAL_CAPACITY);
+    if (list == null) {
+      list = new ArrayList<E>(DEFAULT_ARRAY_INITIAL_CAPACITY);
+      if (type == ListType.CREATED) {
+        created = list;
+      } else if (type == ListType.DELETED){
+        deleted = list;
+      }
     }
-    deleted.add(-i - 1, element);
+    list.add(-i - 1, element);
   }
 
   /**
@@ -228,7 +210,7 @@ public class Diff<K, E extends Diff.Element<K>> {
    */
   public int create(final E element) {
     final int c = search(created, element.getKey());
-    insertCreated(element, c);
+    insert(ListType.CREATED, element, c);
     return c;
   }
 
@@ -254,7 +236,7 @@ public class Diff<K, E extends Diff.Element<K>> {
     } else {
       // not in c-list, it must be in previous
       d = search(deleted, element.getKey());
-      insertDeleted(element, d);
+      insert(ListType.DELETED, element, d);
     }
     return new UndoInfo<E>(c, previous, d);
   }
@@ -295,8 +277,8 @@ public class Diff<K, E extends Diff.Element<K>> {
       d = search(deleted, oldElement.getKey());
       if (d < 0) {
         // Case 2.3: neither in c-list nor d-list
-        insertCreated(newElement, c);
-        insertDeleted(oldElement, d);
+        insert(ListType.CREATED, newElement, c);
+        insert(ListType.DELETED, oldElement, d);
       }
     }
     return new UndoInfo<E>(c, previous, d);
@@ -365,7 +347,8 @@ public class Diff<K, E extends Diff.Element<K>> {
    * @return the current state of the list.
    */
   public List<E> apply2Previous(final List<E> previous) {
-    return apply2Previous(previous, getCreatedList(), getDeletedList());
+    return apply2Previous(previous,
+        getList(ListType.CREATED), getList(ListType.DELETED));
   }
 
   private static <K, E extends Diff.Element<K>> List<E> apply2Previous(
@@ -387,7 +370,8 @@ public class Diff<K, E extends Diff.Element<K>> {
    * @return the previous state of the list.
    */
   public List<E> apply2Current(final List<E> current) {
-    return apply2Previous(current, getDeletedList(), getCreatedList());
+    return apply2Previous(current,
+        getList(ListType.DELETED), getList(ListType.CREATED));
   }
   
   /**
@@ -422,8 +406,8 @@ public class Diff<K, E extends Diff.Element<K>> {
    */
   public void combinePosterior(final Diff<K, E> posterior,
       final Processor<E> deletedProcesser) {
-    final Iterator<E> createdIterator = posterior.getCreatedList().iterator();
-    final Iterator<E> deletedIterator = posterior.getDeletedList().iterator();
+    final Iterator<E> createdIterator = posterior.getList(ListType.CREATED).iterator();
+    final Iterator<E> deletedIterator = posterior.getList(ListType.DELETED).iterator();
 
     E c = createdIterator.hasNext()? createdIterator.next(): null;
     E d = deletedIterator.hasNext()? deletedIterator.next(): null;
@@ -458,7 +442,7 @@ public class Diff<K, E extends Diff.Element<K>> {
   @Override
   public String toString() {
     return getClass().getSimpleName()
-        +  "{created=" + getCreatedList()
-        + ", deleted=" + getDeletedList() + "}";
+        +  "{created=" + getList(ListType.CREATED)
+        + ", deleted=" + getList(ListType.DELETED) + "}";
   }
 }