HBASE-27279 Addendum fix TestSaslTlsIPCRejectPlainText
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / io / hfile / HFileBlock.java
index d054e83ac19f426a0f4a33e2d5d23998576341f8..8e04580874fe5e71e3c29cd8d46d8764e66029c8 100644 (file)
 package org.apache.hadoop.hbase.io.hfile;
 
 import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
-
+import static org.apache.hadoop.hbase.io.hfile.BlockCompressedSizePredicator.BLOCK_COMPRESSED_SIZE_PREDICATOR;
+import static org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer.CONTEXT_KEY;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
@@ -26,6 +33,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -46,15 +54,18 @@ import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
+import org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer;
 import org.apache.hadoop.hbase.io.util.BlockIOUtils;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.nio.MultiByteBuff;
 import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.regionserver.ShipperListener;
+import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ReadType;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -418,7 +429,23 @@ public class HFileBlock implements Cacheable {
     return buf.release();
   }
 
-  /** @return get data block encoding id that was used to encode this block */
+  /**
+   * Calling this method in strategic locations where HFileBlocks are referenced may help diagnose
+   * potential buffer leaks. We pass the block itself as a default hint, but one can use
+   * {@link #touch(Object)} to pass their own hint as well.
+   */
+  @Override
+  public HFileBlock touch() {
+    return touch(this);
+  }
+
+  @Override
+  public HFileBlock touch(Object hint) {
+    buf.touch(hint);
+    return this;
+  }
+
+  /** Returns get data block encoding id that was used to encode this block */
   short getDataBlockEncodingId() {
     if (blockType != BlockType.ENCODED_DATA) {
       throw new IllegalArgumentException("Querying encoder ID of a block " + "of type other than "
@@ -427,30 +454,22 @@ public class HFileBlock implements Cacheable {
     return buf.getShort(headerSize());
   }
 
-  /**
-   * @return the on-disk size of header + data part + checksum.
-   */
+  /** Returns the on-disk size of header + data part + checksum. */
   public int getOnDiskSizeWithHeader() {
     return onDiskSizeWithoutHeader + headerSize();
   }
 
-  /**
-   * @return the on-disk size of the data part + checksum (header excluded).
-   */
+  /** Returns the on-disk size of the data part + checksum (header excluded). */
   int getOnDiskSizeWithoutHeader() {
     return onDiskSizeWithoutHeader;
   }
 
-  /**
-   * @return the uncompressed size of data part (header and checksum excluded).
-   */
-  int getUncompressedSizeWithoutHeader() {
+  /** Returns the uncompressed size of data part (header and checksum excluded). */
+  public int getUncompressedSizeWithoutHeader() {
     return uncompressedSizeWithoutHeader;
   }
 
-  /**
-   * @return the offset of the previous block of the same type in the file, or -1 if unknown
-   */
+  /** Returns the offset of the previous block of the same type in the file, or -1 if unknown */
   long getPrevBlockOffset() {
     return prevBlockOffset;
   }
@@ -477,18 +496,8 @@ public class HFileBlock implements Cacheable {
    * @return the buffer with header skipped and checksum omitted.
    */
   public ByteBuff getBufferWithoutHeader() {
-    return this.getBufferWithoutHeader(false);
-  }
-
-  /**
-   * Returns a buffer that does not include the header or checksum.
-   * @param withChecksum to indicate whether include the checksum or not.
-   * @return the buffer with header skipped and checksum omitted.
-   */
-  public ByteBuff getBufferWithoutHeader(boolean withChecksum) {
     ByteBuff dup = getBufferReadOnly();
-    int delta = withChecksum ? 0 : totalChecksumBytes();
-    return dup.position(headerSize()).limit(buf.limit() - delta).slice();
+    return dup.position(headerSize()).slice();
   }
 
   /**
@@ -552,19 +561,21 @@ public class HFileBlock implements Cacheable {
       sanityCheckAssertion(dup.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader");
     }
 
-    int cksumBytes = totalChecksumBytes();
-    int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes;
-    if (dup.limit() != expectedBufLimit) {
-      throw new AssertionError("Expected limit " + expectedBufLimit + ", got " + dup.limit());
+    if (dup.limit() != onDiskDataSizeWithHeader) {
+      throw new AssertionError(
+        "Expected limit " + onDiskDataSizeWithHeader + ", got " + dup.limit());
     }
 
     // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next
     // block's header, so there are two sensible values for buffer capacity.
     int hdrSize = headerSize();
     dup.rewind();
-    if (dup.remaining() != expectedBufLimit && dup.remaining() != expectedBufLimit + hdrSize) {
+    if (
+      dup.remaining() != onDiskDataSizeWithHeader
+        && dup.remaining() != onDiskDataSizeWithHeader + hdrSize
+    ) {
       throw new AssertionError("Invalid buffer capacity: " + dup.remaining() + ", expected "
-        + expectedBufLimit + " or " + (expectedBufLimit + hdrSize));
+        + onDiskDataSizeWithHeader + " or " + (onDiskDataSizeWithHeader + hdrSize));
     }
   }
 
@@ -616,20 +627,24 @@ public class HFileBlock implements Cacheable {
       return this;
     }
 
-    HFileBlock unpacked = shallowClone(this);
-    unpacked.allocateBuffer(); // allocates space for the decompressed block
+    ByteBuff newBuf = allocateBufferForUnpacking(); // allocates space for the decompressed block
+    HFileBlock unpacked = shallowClone(this, newBuf);
+
     boolean succ = false;
-    try {
+    final Context context =
+      Context.current().with(CONTEXT_KEY, new HFileContextAttributesBuilderConsumer(fileContext));
+    try (Scope ignored = context.makeCurrent()) {
       HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA
         ? reader.getBlockDecodingContext()
         : reader.getDefaultBlockDecodingContext();
       // Create a duplicated buffer without the header part.
+      int headerSize = this.headerSize();
       ByteBuff dup = this.buf.duplicate();
-      dup.position(this.headerSize());
+      dup.position(headerSize);
       dup = dup.slice();
       // Decode the dup into unpacked#buf
-      ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
-        unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(true), dup);
+      ctx.prepareDecoding(unpacked.getOnDiskDataSizeWithHeader() - headerSize,
+        unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), dup);
       succ = true;
       return unpacked;
     } finally {
@@ -643,20 +658,20 @@ public class HFileBlock implements Cacheable {
    * Always allocates a new buffer of the correct size. Copies header bytes from the existing
    * buffer. Does not change header fields. Reserve room to keep checksum bytes too.
    */
-  private void allocateBuffer() {
-    int cksumBytes = totalChecksumBytes();
+  private ByteBuff allocateBufferForUnpacking() {
     int headerSize = headerSize();
-    int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
+    int capacityNeeded = headerSize + uncompressedSizeWithoutHeader;
 
+    ByteBuff source = buf.duplicate();
     ByteBuff newBuf = allocator.allocate(capacityNeeded);
 
     // Copy header bytes into newBuf.
-    buf.position(0);
-    newBuf.put(0, buf, 0, headerSize);
+    source.position(0);
+    newBuf.put(0, source, 0, headerSize);
 
-    buf = newBuf;
     // set limit to exclude next block's header
-    buf.limit(capacityNeeded);
+    newBuf.limit(capacityNeeded);
+    return newBuf;
   }
 
   /**
@@ -664,9 +679,8 @@ public class HFileBlock implements Cacheable {
    * calculated heuristic, not tracked attribute of the block.
    */
   public boolean isUnpacked() {
-    final int cksumBytes = totalChecksumBytes();
     final int headerSize = headerSize();
-    final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes;
+    final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader;
     final int bufCapacity = buf.remaining();
     return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize;
   }
@@ -683,9 +697,7 @@ public class HFileBlock implements Cacheable {
     return offset;
   }
 
-  /**
-   * @return a byte stream reading the data + checksum of this block
-   */
+  /** Returns a byte stream reading the data + checksum of this block */
   DataInputStream getByteStream() {
     ByteBuff dup = this.buf.duplicate();
     dup.position(this.headerSize());
@@ -708,11 +720,6 @@ public class HFileBlock implements Cacheable {
    * by default.
    */
   public boolean isSharedMem() {
-    if (this instanceof SharedMemHFileBlock) {
-      return true;
-    } else if (this instanceof ExclusiveMemHFileBlock) {
-      return false;
-    }
     return true;
   }
 
@@ -735,6 +742,10 @@ public class HFileBlock implements Cacheable {
       BLOCK_READY
     }
 
+    private int maxSizeUnCompressed;
+
+    private BlockCompressedSizePredicator compressedSizePredicator;
+
     /** Writer state. Used to ensure the correct usage protocol. */
     private State state = State.INIT;
 
@@ -813,11 +824,11 @@ public class HFileBlock implements Cacheable {
      */
     public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
       HFileContext fileContext) {
-      this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP);
+      this(conf, dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP, fileContext.getBlocksize());
     }
 
     public Writer(Configuration conf, HFileDataBlockEncoder dataBlockEncoder,
-      HFileContext fileContext, ByteBuffAllocator allocator) {
+      HFileContext fileContext, ByteBuffAllocator allocator, int maxSizeUnCompressed) {
       if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
         throw new RuntimeException("Unsupported value of bytesPerChecksum. " + " Minimum is "
           + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is "
@@ -840,6 +851,10 @@ public class HFileBlock implements Cacheable {
       // TODO: Why fileContext saved away when we have dataBlockEncoder and/or
       // defaultDataBlockEncoder?
       this.fileContext = fileContext;
+      this.compressedSizePredicator = (BlockCompressedSizePredicator) ReflectionUtils.newInstance(
+        conf.getClass(BLOCK_COMPRESSED_SIZE_PREDICATOR, UncompressedBlockSizePredicator.class),
+        new Configuration(conf));
+      this.maxSizeUnCompressed = maxSizeUnCompressed;
     }
 
     /**
@@ -892,6 +907,15 @@ public class HFileBlock implements Cacheable {
       finishBlock();
     }
 
+    public boolean checkBoundariesWithPredicate() {
+      int rawBlockSize = encodedBlockSizeWritten();
+      if (rawBlockSize >= maxSizeUnCompressed) {
+        return true;
+      } else {
+        return compressedSizePredicator.shouldFinishBlock(rawBlockSize);
+      }
+    }
+
     /**
      * Finish up writing of the block. Flushes the compressing stream (if using compression), fills
      * out the header, does any compression/encryption of bytes to flush out to disk, and manages
@@ -906,6 +930,11 @@ public class HFileBlock implements Cacheable {
       userDataStream.flush();
       prevOffset = prevOffsetByType[blockType.getId()];
 
+      // We need to cache the unencoded/uncompressed size before changing the block state
+      int rawBlockSize = 0;
+      if (this.getEncodingState() != null) {
+        rawBlockSize = blockSizeWritten();
+      }
       // We need to set state before we can package the block up for cache-on-write. In a way, the
       // block is ready, but not yet encoded or compressed.
       state = State.BLOCK_READY;
@@ -926,6 +955,10 @@ public class HFileBlock implements Cacheable {
       onDiskBlockBytesWithHeader.reset();
       onDiskBlockBytesWithHeader.write(compressAndEncryptDat.get(),
         compressAndEncryptDat.getOffset(), compressAndEncryptDat.getLength());
+      // Update raw and compressed sizes in the predicate
+      compressedSizePredicator.updateLatestBlockSizes(fileContext, rawBlockSize,
+        onDiskBlockBytesWithHeader.size());
+
       // Calculate how many bytes we need for checksum on the tail of the block.
       int numBytes = (int) ChecksumUtil.numBytes(onDiskBlockBytesWithHeader.size(),
         fileContext.getBytesPerChecksum());
@@ -933,6 +966,7 @@ public class HFileBlock implements Cacheable {
       // Put the header for the on disk bytes; header currently is unfilled-out
       putHeader(onDiskBlockBytesWithHeader, onDiskBlockBytesWithHeader.size() + numBytes,
         baosInMemory.size(), onDiskBlockBytesWithHeader.size());
+
       if (onDiskChecksum.length != numBytes) {
         onDiskChecksum = new byte[numBytes];
       }
@@ -1072,12 +1106,12 @@ public class HFileBlock implements Cacheable {
     /**
      * The uncompressed size of the block data, including header size.
      */
-    int getUncompressedSizeWithHeader() {
+    public int getUncompressedSizeWithHeader() {
       expectState(State.BLOCK_READY);
       return baosInMemory.size();
     }
 
-    /** @return true if a block is being written */
+    /** Returns true if a block is being written */
     boolean isWriting() {
       return state == State.WRITING;
     }
@@ -1096,7 +1130,7 @@ public class HFileBlock implements Cacheable {
      * block at the moment. Note that this will return zero in the "block ready" state as well.
      * @return the number of bytes written
      */
-    int blockSizeWritten() {
+    public int blockSizeWritten() {
       return state != State.WRITING ? 0 : this.getEncodingState().getUnencodedDataSizeWritten();
     }
 
@@ -1485,63 +1519,69 @@ public class HFileBlock implements Cacheable {
       boolean updateMetrics, boolean intoHeap) throws IOException {
       // Get a copy of the current state of whether to validate
       // hbase checksums or not for this read call. This is not
-      // thread-safe but the one constaint is that if we decide
+      // thread-safe but the one constraint is that if we decide
       // to skip hbase checksum verification then we are
       // guaranteed to use hdfs checksum verification.
       boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
       FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
-
-      HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
-        doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
-      if (blk == null) {
-        HFile.LOG.warn("HBase checksum verification failed for file " + pathName + " at offset "
-          + offset + " filesize " + fileSize + ". Retrying read with HDFS checksums turned on...");
-
-        if (!doVerificationThruHBaseChecksum) {
-          String msg = "HBase checksum verification failed for file " + pathName + " at offset "
-            + offset + " filesize " + fileSize + " but this cannot happen because doVerify is "
-            + doVerificationThruHBaseChecksum;
-          HFile.LOG.warn(msg);
-          throw new IOException(msg); // cannot happen case here
-        }
-        HFile.CHECKSUM_FAILURES.increment(); // update metrics
-
-        // If we have a checksum failure, we fall back into a mode where
-        // the next few reads use HDFS level checksums. We aim to make the
-        // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
-        // hbase checksum verification, but since this value is set without
-        // holding any locks, it can so happen that we might actually do
-        // a few more than precisely this number.
-        is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
-        doVerificationThruHBaseChecksum = false;
-        blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
+      final Context context = Context.current().with(CONTEXT_KEY,
+        new HFileContextAttributesBuilderConsumer(fileContext)
+          .setSkipChecksum(doVerificationThruHBaseChecksum)
+          .setReadType(pread ? ReadType.POSITIONAL_READ : ReadType.SEEK_PLUS_READ));
+      try (Scope ignored = context.makeCurrent()) {
+        HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
           doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
-        if (blk != null) {
-          HFile.LOG.warn("HDFS checksum verification succeeded for file " + pathName + " at offset "
-            + offset + " filesize " + fileSize);
+        if (blk == null) {
+          HFile.LOG.warn("HBase checksum verification failed for file {} at offset {} filesize {}."
+            + " Retrying read with HDFS checksums turned on...", pathName, offset, fileSize);
+
+          if (!doVerificationThruHBaseChecksum) {
+            String msg = "HBase checksum verification failed for file " + pathName + " at offset "
+              + offset + " filesize " + fileSize + " but this cannot happen because doVerify is "
+              + doVerificationThruHBaseChecksum;
+            HFile.LOG.warn(msg);
+            throw new IOException(msg); // cannot happen case here
+          }
+          HFile.CHECKSUM_FAILURES.increment(); // update metrics
+
+          // If we have a checksum failure, we fall back into a mode where
+          // the next few reads use HDFS level checksums. We aim to make the
+          // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
+          // hbase checksum verification, but since this value is set without
+          // holding any locks, it can so happen that we might actually do
+          // a few more than precisely this number.
+          is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
+          doVerificationThruHBaseChecksum = false;
+          blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
+            doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
+          if (blk != null) {
+            HFile.LOG.warn(
+              "HDFS checksum verification succeeded for file {} at offset {} filesize" + " {}",
+              pathName, offset, fileSize);
+          }
+        }
+        if (blk == null && !doVerificationThruHBaseChecksum) {
+          String msg =
+            "readBlockData failed, possibly due to " + "checksum verification failed for file "
+              + pathName + " at offset " + offset + " filesize " + fileSize;
+          HFile.LOG.warn(msg);
+          throw new IOException(msg);
         }
-      }
-      if (blk == null && !doVerificationThruHBaseChecksum) {
-        String msg =
-          "readBlockData failed, possibly due to " + "checksum verification failed for file "
-            + pathName + " at offset " + offset + " filesize " + fileSize;
-        HFile.LOG.warn(msg);
-        throw new IOException(msg);
-      }
 
-      // If there is a checksum mismatch earlier, then retry with
-      // HBase checksums switched off and use HDFS checksum verification.
-      // This triggers HDFS to detect and fix corrupt replicas. The
-      // next checksumOffCount read requests will use HDFS checksums.
-      // The decrementing of this.checksumOffCount is not thread-safe,
-      // but it is harmless because eventually checksumOffCount will be
-      // a negative number.
-      streamWrapper.checksumOk();
-      return blk;
+        // If there is a checksum mismatch earlier, then retry with
+        // HBase checksums switched off and use HDFS checksum verification.
+        // This triggers HDFS to detect and fix corrupt replicas. The
+        // next checksumOffCount read requests will use HDFS checksums.
+        // The decrementing of this.checksumOffCount is not thread-safe,
+        // but it is harmless because eventually checksumOffCount will be
+        // a negative number.
+        streamWrapper.checksumOk();
+        return blk;
+      }
     }
 
     /**
-     * @return Check <code>onDiskSizeWithHeaderL</code> size is healthy and then return it as an int
+     * Returns Check <code>onDiskSizeWithHeaderL</code> size is healthy and then return it as an int
      */
     private static int checkAndGetSizeAsInt(final long onDiskSizeWithHeaderL, final int hdrSize)
       throws IOException {
@@ -1635,6 +1675,11 @@ public class HFileBlock implements Cacheable {
         throw new IOException("Invalid offset=" + offset + " trying to read " + "block (onDiskSize="
           + onDiskSizeWithHeaderL + ")");
       }
+
+      final Span span = Span.current();
+      final AttributesBuilder attributesBuilder = Attributes.builder();
+      Optional.of(Context.current()).map(val -> val.get(CONTEXT_KEY))
+        .ifPresent(c -> c.accept(attributesBuilder));
       int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL, hdrSize);
       // Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1
       // and will save us having to seek the stream backwards to reread the header we
@@ -1659,8 +1704,9 @@ public class HFileBlock implements Cacheable {
         // in a LOG every time we seek. See HBASE-17072 for more detail.
         if (headerBuf == null) {
           if (LOG.isTraceEnabled()) {
-            LOG.trace("Extra see to get block size!", new RuntimeException());
+            LOG.trace("Extra seek to get block size!", new RuntimeException());
           }
+          span.addEvent("Extra seek to get block size!", attributesBuilder.build());
           headerBuf = HEAP.allocate(hdrSize);
           readAtOffset(is, headerBuf, hdrSize, false, offset, pread);
           headerBuf.rewind();
@@ -1696,6 +1742,9 @@ public class HFileBlock implements Cacheable {
         if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) {
           return null;
         }
+        // remove checksum from buffer now that it's verified
+        int sizeWithoutChecksum = curBlock.getInt(Header.ON_DISK_DATA_SIZE_WITH_HEADER_INDEX);
+        curBlock.limit(sizeWithoutChecksum);
         long duration = EnvironmentEdgeManager.currentTime() - startTime;
         if (updateMetrics) {
           HFile.updateReadLatency(duration, pread);
@@ -1710,6 +1759,7 @@ public class HFileBlock implements Cacheable {
           hFileBlock.sanityCheckUncompressed();
         }
         LOG.trace("Read {} in {} ms", hFileBlock, duration);
+        span.addEvent("Read block", attributesBuilder.build());
         // Cache next block header if we read it for the next time through here.
         if (nextBlockOnDiskSize != -1) {
           cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), onDiskBlock,
@@ -1915,7 +1965,7 @@ public class HFileBlock implements Cacheable {
     return this.fileContext.getBytesPerChecksum();
   }
 
-  /** @return the size of data on disk + header. Excludes checksum. */
+  /** Returns the size of data on disk + header. Excludes checksum. */
   int getOnDiskDataSizeWithHeader() {
     return this.onDiskDataSizeWithHeader;
   }
@@ -1997,23 +2047,31 @@ public class HFileBlock implements Cacheable {
       + onDiskDataSizeWithHeader;
   }
 
-  private static HFileBlockBuilder createBuilder(HFileBlock blk) {
+  /**
+   * Creates a new HFileBlockBuilder from the existing block and a new ByteBuff. The builder will be
+   * loaded with all of the original fields from blk, except now using the newBuff and setting
+   * isSharedMem based on the source of the passed in newBuff. An existing HFileBlock may have been
+   * an {@link ExclusiveMemHFileBlock}, but the new buffer might call for a
+   * {@link SharedMemHFileBlock}. Or vice versa.
+   * @param blk     the block to clone from
+   * @param newBuff the new buffer to use
+   */
+  private static HFileBlockBuilder createBuilder(HFileBlock blk, ByteBuff newBuff) {
     return new HFileBlockBuilder().withBlockType(blk.blockType)
       .withOnDiskSizeWithoutHeader(blk.onDiskSizeWithoutHeader)
       .withUncompressedSizeWithoutHeader(blk.uncompressedSizeWithoutHeader)
-      .withPrevBlockOffset(blk.prevBlockOffset).withByteBuff(blk.buf.duplicate()) // Duplicate the
-                                                                                  // buffer.
-      .withOffset(blk.offset).withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader)
+      .withPrevBlockOffset(blk.prevBlockOffset).withByteBuff(newBuff).withOffset(blk.offset)
+      .withOnDiskDataSizeWithHeader(blk.onDiskDataSizeWithHeader)
       .withNextBlockOnDiskSize(blk.nextBlockOnDiskSize).withHFileContext(blk.fileContext)
-      .withByteBuffAllocator(blk.allocator).withShared(blk.isSharedMem());
+      .withByteBuffAllocator(blk.allocator).withShared(!newBuff.hasArray());
   }
 
-  static HFileBlock shallowClone(HFileBlock blk) {
-    return createBuilder(blk).build();
+  private static HFileBlock shallowClone(HFileBlock blk, ByteBuff newBuf) {
+    return createBuilder(blk, newBuf).build();
   }
 
   static HFileBlock deepCloneOnHeap(HFileBlock blk) {
     ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit())));
-    return createBuilder(blk).withByteBuff(deepCloned).withShared(false).build();
+    return createBuilder(blk, deepCloned).build();
   }
 }