HDDS-941. Rename ChunkGroupInputStream to keyInputStream and ChunkInputStream to...
authorBharat Viswanadham <bharat@apache.org>
Thu, 10 Jan 2019 04:02:36 +0000 (20:02 -0800)
committerBharat Viswanadham <bharat@apache.org>
Thu, 10 Jan 2019 04:02:36 +0000 (20:02 -0800)
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java [moved from hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java with 97% similarity]
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java [moved from hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java with 90% similarity]
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneInputStream.java
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSInputStream.java
hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java

@@ -46,7 +46,7 @@ import java.util.List;
  * instances.  This class encapsulates all state management for iterating
  * through the sequence of chunks and the sequence of buffers within each chunk.
  */
-public class ChunkInputStream extends InputStream implements Seekable {
+public class BlockInputStream extends InputStream implements Seekable {
 
   private static final int EOF = -1;
 
@@ -61,7 +61,7 @@ public class ChunkInputStream extends InputStream implements Seekable {
   private int bufferIndex;
 
   /**
-   * Creates a new ChunkInputStream.
+   * Creates a new BlockInputStream.
    *
    * @param blockID block ID of the chunk
    * @param xceiverClientManager client manager that controls client
@@ -69,7 +69,7 @@ public class ChunkInputStream extends InputStream implements Seekable {
    * @param chunks list of chunks to read
    * @param traceID container protocol call traceID
    */
-  public ChunkInputStream(
+  public BlockInputStream(
       BlockID blockID, XceiverClientManager xceiverClientManager,
       XceiverClientSpi xceiverClient, List<ChunkInfo> chunks, String traceID) {
     this.blockID = blockID;
@@ -79,7 +79,7 @@ public class ChunkInputStream extends InputStream implements Seekable {
     this.chunks = chunks;
     this.chunkIndex = -1;
     // chunkOffset[i] stores offset at which chunk i stores data in
-    // ChunkInputStream
+    // BlockInputStream
     this.chunkOffset = new long[this.chunks.size()];
     initializeChunkOffset();
     this.buffers = null;
@@ -154,7 +154,7 @@ public class ChunkInputStream extends InputStream implements Seekable {
    */
   private synchronized void checkOpen() throws IOException {
     if (xceiverClient == null) {
-      throw new IOException("ChunkInputStream has been closed.");
+      throw new IOException("BlockInputStream has been closed.");
     }
   }
 
@@ -30,7 +30,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdds.scm.storage.ChunkInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
 import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
 import org.apache.ratis.util.Preconditions;
 import org.slf4j.Logger;
@@ -44,17 +44,17 @@ import java.util.Arrays;
 import java.util.List;
 
 /**
- * Maintaining a list of ChunkInputStream. Read based on offset.
+ * Maintaining a list of BlockInputStream. Read based on offset.
  */
-public class ChunkGroupInputStream extends InputStream implements Seekable {
+public class KeyInputStream extends InputStream implements Seekable {
 
   private static final Logger LOG =
-      LoggerFactory.getLogger(ChunkGroupInputStream.class);
+      LoggerFactory.getLogger(KeyInputStream.class);
 
   private static final int EOF = -1;
 
   private final ArrayList<ChunkInputStreamEntry> streamEntries;
-  // streamOffset[i] stores the offset at which chunkInputStream i stores
+  // streamOffset[i] stores the offset at which blockInputStream i stores
   // data in the key
   private long[] streamOffset = null;
   private int currentStreamIndex;
@@ -62,7 +62,7 @@ public class ChunkGroupInputStream extends InputStream implements Seekable {
   private boolean closed = false;
   private String key;
 
-  public ChunkGroupInputStream() {
+  public KeyInputStream() {
     streamEntries = new ArrayList<>();
     currentStreamIndex = 0;
   }
@@ -84,7 +84,7 @@ public class ChunkGroupInputStream extends InputStream implements Seekable {
    * @param streamLength the max number of bytes that should be written to this
    *                     stream.
    */
-  public synchronized void addStream(ChunkInputStream stream,
+  public synchronized void addStream(BlockInputStream stream,
       long streamLength) {
     streamEntries.add(new ChunkInputStreamEntry(stream, streamLength));
   }
@@ -129,7 +129,7 @@ public class ChunkGroupInputStream extends InputStream implements Seekable {
         // this case.
         throw new IOException(String.format(
             "Inconsistent read for blockID=%s length=%d numBytesRead=%d",
-            current.chunkInputStream.getBlockID(), current.length,
+            current.blockInputStream.getBlockID(), current.length,
             numBytesRead));
       }
       totalReadLen += numBytesRead;
@@ -174,7 +174,7 @@ public class ChunkGroupInputStream extends InputStream implements Seekable {
       // accordingly so that currentStreamIndex = insertionPoint - 1
       currentStreamIndex = -currentStreamIndex - 2;
     }
-    // seek to the proper offset in the ChunkInputStream
+    // seek to the proper offset in the BlockInputStream
     streamEntries.get(currentStreamIndex)
         .seek(pos - streamOffset[currentStreamIndex]);
   }
@@ -207,17 +207,17 @@ public class ChunkGroupInputStream extends InputStream implements Seekable {
   }
 
   /**
-   * Encapsulates ChunkInputStream.
+   * Encapsulates BlockInputStream.
    */
   public static class ChunkInputStreamEntry extends InputStream
       implements Seekable {
 
-    private final ChunkInputStream chunkInputStream;
+    private final BlockInputStream blockInputStream;
     private final long length;
 
-    public ChunkInputStreamEntry(ChunkInputStream chunkInputStream,
+    public ChunkInputStreamEntry(BlockInputStream blockInputStream,
         long length) {
-      this.chunkInputStream = chunkInputStream;
+      this.blockInputStream = blockInputStream;
       this.length = length;
     }
 
@@ -228,29 +228,29 @@ public class ChunkGroupInputStream extends InputStream implements Seekable {
     @Override
     public synchronized int read(byte[] b, int off, int len)
         throws IOException {
-      int readLen = chunkInputStream.read(b, off, len);
+      int readLen = blockInputStream.read(b, off, len);
       return readLen;
     }
 
     @Override
     public synchronized int read() throws IOException {
-      int data = chunkInputStream.read();
+      int data = blockInputStream.read();
       return data;
     }
 
     @Override
     public synchronized void close() throws IOException {
-      chunkInputStream.close();
+      blockInputStream.close();
     }
 
     @Override
     public void seek(long pos) throws IOException {
-      chunkInputStream.seek(pos);
+      blockInputStream.seek(pos);
     }
 
     @Override
     public long getPos() throws IOException {
-      return chunkInputStream.getPos();
+      return blockInputStream.getPos();
     }
 
     @Override
@@ -267,7 +267,7 @@ public class ChunkGroupInputStream extends InputStream implements Seekable {
       String requestId) throws IOException {
     long length = 0;
     long containerKey;
-    ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream();
+    KeyInputStream groupInputStream = new KeyInputStream();
     groupInputStream.key = keyInfo.getKeyName();
     List<OmKeyLocationInfo> keyLocationInfos =
         keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly();
@@ -304,7 +304,7 @@ public class ChunkGroupInputStream extends InputStream implements Seekable {
           length += chunk.getLen();
         }
         success = true;
-        ChunkInputStream inputStream = new ChunkInputStream(
+        BlockInputStream inputStream = new BlockInputStream(
             omKeyLocationInfo.getBlockID(), xceiverClientManager, xceiverClient,
             chunks, requestId);
         groupInputStream.addStream(inputStream,
index 1c82ef4..66e4199 100644 (file)
@@ -50,7 +50,7 @@ import java.util.ListIterator;
 import java.util.concurrent.TimeoutException;
 
 /**
- * Maintaining a list of ChunkInputStream. Write based on offset.
+ * Maintaining a list of BlockInputStream. Write based on offset.
  *
  * Note that this may write to multiple containers in one write call. In case
  * that first container succeeded but later ones failed, the succeeded writes
index e1f65e6..92eb150 100644 (file)
 
 package org.apache.hadoop.ozone.client.io;
 
-import org.apache.hadoop.hdds.scm.storage.ChunkInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
 
 import java.io.IOException;
 import java.io.InputStream;
 
 /**
  * OzoneInputStream is used to read data from Ozone.
- * It uses SCM's {@link ChunkInputStream} for reading the data.
+ * It uses {@link KeyInputStream} for reading the data.
  */
 public class OzoneInputStream extends InputStream {
 
   private final InputStream inputStream;
 
   /**
-   * Constructs OzoneInputStream with ChunkInputStream.
+   * Constructs OzoneInputStream with KeyInputStream.
    *
    * @param inputStream
    */
index b6fb6b5..bd80b3e 100644 (file)
@@ -38,7 +38,7 @@ import org.apache.hadoop.hdds.client.OzoneQuota;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.ozone.client.VolumeArgs;
-import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream;
+import org.apache.hadoop.ozone.client.io.KeyInputStream;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.LengthInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
@@ -541,7 +541,7 @@ public class RpcClient implements ClientProtocol {
         .build();
     OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs);
     LengthInputStream lengthInputStream =
-        ChunkGroupInputStream.getFromOmKeyInfo(
+        KeyInputStream.getFromOmKeyInfo(
             keyInfo, xceiverClientManager, storageContainerLocationClient,
             requestId);
     return new OzoneInputStream(lengthInputStream.getWrappedStream());
index a078c01..b73f297 100644 (file)
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.client.io.KeyInputStream;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.LengthInputStream;
 import org.apache.hadoop.ozone.common.Checksum;
@@ -38,7 +39,6 @@ import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTrans
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.OzoneConsts.Versioning;
-import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
@@ -475,7 +475,7 @@ public final class DistributedStorageHandler implements StorageHandler {
         .setDataSize(args.getSize())
         .build();
     OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs);
-    return ChunkGroupInputStream.getFromOmKeyInfo(
+    return KeyInputStream.getFromOmKeyInfo(
         keyInfo, xceiverClientManager, storageContainerLocationClient,
         args.getRequestID());
   }
index 4ef46a3..f3ab093 100644 (file)
@@ -17,8 +17,8 @@
 package org.apache.hadoop.ozone.om;
 
 import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream;
-import org.apache.hadoop.hdds.scm.storage.ChunkInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.ozone.client.io.KeyInputStream;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -31,7 +31,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertEquals;
 
 /**
- * This class tests ChunkGroupInputStream and KeyOutputStream.
+ * This class tests KeyInputStream and KeyOutputStream.
  */
 public class TestChunkStreams {
 
@@ -40,15 +40,15 @@ public class TestChunkStreams {
 
   @Test
   public void testReadGroupInputStream() throws Exception {
-    try (ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream()) {
+    try (KeyInputStream groupInputStream = new KeyInputStream()) {
 
       String dataString = RandomStringUtils.randomAscii(500);
       byte[] buf = dataString.getBytes(UTF_8);
       int offset = 0;
       for (int i = 0; i < 5; i++) {
         int tempOffset = offset;
-        ChunkInputStream in =
-            new ChunkInputStream(null, null, null, new ArrayList<>(), null) {
+        BlockInputStream in =
+            new BlockInputStream(null, null, null, new ArrayList<>(), null) {
               private long pos = 0;
               private ByteArrayInputStream in =
                   new ByteArrayInputStream(buf, tempOffset, 100);
@@ -96,15 +96,15 @@ public class TestChunkStreams {
 
   @Test
   public void testErrorReadGroupInputStream() throws Exception {
-    try (ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream()) {
+    try (KeyInputStream groupInputStream = new KeyInputStream()) {
 
       String dataString = RandomStringUtils.randomAscii(500);
       byte[] buf = dataString.getBytes(UTF_8);
       int offset = 0;
       for (int i = 0; i < 5; i++) {
         int tempOffset = offset;
-        ChunkInputStream in =
-            new ChunkInputStream(null, null, null, new ArrayList<>(), null) {
+        BlockInputStream in =
+            new BlockInputStream(null, null, null, new ArrayList<>(), null) {
               private long pos = 0;
               private ByteArrayInputStream in =
                   new ByteArrayInputStream(buf, tempOffset, 100);
index 4c5c0c8..5df3cff 100644 (file)
@@ -21,7 +21,7 @@ package org.apache.hadoop.fs.ozone;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream;
+import org.apache.hadoop.ozone.client.io.KeyInputStream;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -36,10 +36,10 @@ import java.io.InputStream;
 @InterfaceStability.Evolving
 public final class OzoneFSInputStream extends FSInputStream {
 
-  private final ChunkGroupInputStream inputStream;
+  private final KeyInputStream inputStream;
 
   public OzoneFSInputStream(InputStream inputStream) {
-    this.inputStream = (ChunkGroupInputStream)inputStream;
+    this.inputStream = (KeyInputStream)inputStream;
   }
 
   @Override
index 654962b..9efcc87 100644 (file)
 package org.apache.hadoop.ozone.s3.io;
 
 import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream;
+import org.apache.hadoop.ozone.client.io.KeyInputStream;
 
 import java.io.IOException;
 import java.io.InputStream;
 
 /**
- * S3Wrapper Input Stream which encapsulates ChunkGroupInputStream from ozone.
+ * S3Wrapper Input Stream which encapsulates KeyInputStream from ozone.
  */
 public class S3WrapperInputStream extends FSInputStream {
-  private final ChunkGroupInputStream inputStream;
+  private final KeyInputStream inputStream;
 
   /**
-   * Constructs S3WrapperInputStream with ChunkInputStream.
+   * Constructs S3WrapperInputStream with KeyInputStream.
    *
    * @param inputStream
    */
   public S3WrapperInputStream(InputStream inputStream) {
-    this.inputStream = (ChunkGroupInputStream) inputStream;
+    this.inputStream = (KeyInputStream) inputStream;
   }
 
   @Override