HADOOP-16040. ABFS: Bug fix for tolerateOobAppends configuration.
authorDa Zhou <da.zhou@microsoft.com>
Thu, 10 Jan 2019 11:58:39 +0000 (11:58 +0000)
committerSteve Loughran <stevel@apache.org>
Thu, 10 Jan 2019 11:58:39 +0000 (11:58 +0000)
Contributed by Da Zhou.

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java

index b2bd64f..222285f 100644 (file)
@@ -374,7 +374,8 @@ public class AzureBlobFileSystemStore {
     // Add statistics for InputStream
     return new AbfsInputStream(client, statistics,
             AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength,
-                abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(), eTag);
+                abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(),
+                abfsConfiguration.getTolerateOobAppends(), eTag);
   }
 
   public OutputStream openFileForWrite(final Path path, final boolean overwrite) throws
index 9593679..fe48cb9 100644 (file)
@@ -61,6 +61,7 @@ public class AbfsInputStream extends FSInputStream {
       final long contentLength,
       final int bufferSize,
       final int readAheadQueueDepth,
+      final boolean tolerateOobAppends,
       final String eTag) {
     this.client = client;
     this.statistics = statistics;
@@ -68,8 +69,8 @@ public class AbfsInputStream extends FSInputStream {
     this.contentLength = contentLength;
     this.bufferSize = bufferSize;
     this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
+    this.tolerateOobAppends = tolerateOobAppends;
     this.eTag = eTag;
-    this.tolerateOobAppends = false;
     this.readAheadEnabled = true;
   }
 
index 9e22790..ebc9c07 100644 (file)
@@ -25,12 +25,14 @@ import java.util.Random;
 
 import org.junit.Test;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
 
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_TOLERATE_CONCURRENT_APPEND;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 /**
@@ -66,7 +68,9 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
   }
 
   @Test (expected = IOException.class)
-  public void testOOBWrites() throws Exception {
+  public void testOOBWritesAndReadFail() throws Exception {
+    Configuration conf = this.getRawConfiguration();
+    conf.setBoolean(AZURE_TOLERATE_CONCURRENT_APPEND, false);
     final AzureBlobFileSystem fs = getFileSystem();
     int readBufferSize = fs.getAbfsStore().getAbfsConfiguration().getReadBufferSize();
 
@@ -83,7 +87,6 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
     try (FSDataInputStream readStream = fs.open(testFilePath)) {
       assertEquals(readBufferSize,
           readStream.read(bytesToRead, 0, readBufferSize));
-
       try (FSDataOutputStream writeStream = fs.create(testFilePath)) {
         writeStream.write(b);
         writeStream.flush();
@@ -95,6 +98,36 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
   }
 
   @Test
+  public void testOOBWritesAndReadSucceed() throws Exception {
+    Configuration conf = this.getRawConfiguration();
+    conf.setBoolean(AZURE_TOLERATE_CONCURRENT_APPEND, true);
+    final AzureBlobFileSystem fs = getFileSystem(conf);
+    int readBufferSize = fs.getAbfsStore().getAbfsConfiguration().getReadBufferSize();
+
+    byte[] bytesToRead = new byte[readBufferSize];
+    final byte[] b = new byte[2 * readBufferSize];
+    new Random().nextBytes(b);
+    final Path testFilePath = new Path(methodName.getMethodName());
+
+    try (FSDataOutputStream writeStream = fs.create(testFilePath)) {
+      writeStream.write(b);
+      writeStream.flush();
+    }
+
+    try (FSDataInputStream readStream = fs.open(testFilePath)) {
+      // Read
+      assertEquals(readBufferSize, readStream.read(bytesToRead, 0, readBufferSize));
+      // Concurrent write
+      try (FSDataOutputStream writeStream = fs.create(testFilePath)) {
+        writeStream.write(b);
+        writeStream.flush();
+      }
+
+      assertEquals(readBufferSize, readStream.read(bytesToRead, 0, readBufferSize));
+    }
+  }
+
+  @Test
   public void testWriteWithBufferOffset() throws Exception {
     final AzureBlobFileSystem fs = getFileSystem();
     final Path testFilePath = new Path(methodName.getMethodName());