NIFI-5879: Fixed bug in FileSystemRepository that can occur if an InputStream is...
authorMark Payne <markap14@hotmail.com>
Thu, 6 Dec 2018 21:22:29 +0000 (16:22 -0500)
committerMatthew Burgess <mattyb149@apache.org>
Mon, 7 Jan 2019 16:06:17 +0000 (11:06 -0500)
Signed-off-by: Matthew Burgess <mattyb149@apache.org>
This closes #3207

nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/LimitedInputStream.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/io/TestLimitedInputStream.java

index c041f5c..125cd50 100644 (file)
@@ -864,9 +864,16 @@ public class FileSystemRepository implements ContentRepository {
 
         }
 
-        // see javadocs for claim.getLength() as to why we do this.
+        // A claim length of -1 indicates that the claim is still being written to and we don't know
+        // the length. In this case, we don't limit the Input Stream. If the Length has been populated, though,
+        // it is possible that the Length could then be extended. However, we do want to avoid ever allowing the
+        // stream to read past the end of the Content Claim. To accomplish this, we use a LimitedInputStream but
+        // provide a LongSupplier for the length instead of a Long value. this allows us to continue reading until
+        // we get to the end of the Claim, even if the Claim grows. This may happen, for instance, if we obtain an
+        // InputStream for this claim, then read from it, write more to the claim, and then attempt to read again. In
+        // such a case, since we have written to that same Claim, we should still be able to read those bytes.
         if (claim.getLength() >= 0) {
-            return new LimitedInputStream(fis, claim.getLength());
+            return new LimitedInputStream(fis, claim::getLength);
         } else {
             return fis;
         }
index 4354dc4..cc3ac19 100644 (file)
@@ -2267,7 +2267,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         final StandardRepositoryRecord record = getRecord(source);
 
         try {
-            ensureNotAppending(record.getCurrentClaim());
+            final ContentClaim currentClaim = record.getCurrentClaim();
+            ensureNotAppending(currentClaim);
+            claimCache.flush(currentClaim);
         } catch (final IOException e) {
             throw new FlowFileAccessException("Failed to access ContentClaim for " + source.toString(), e);
         }
index 74597ae..7c32cc8 100644 (file)
@@ -18,21 +18,36 @@ package org.apache.nifi.controller.repository.io;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Objects;
+import java.util.function.LongSupplier;
 
 public class LimitedInputStream extends InputStream {
 
     private final InputStream in;
-    private long limit;
+    private final long limit;
+    private final LongSupplier limitSupplier;
     private long bytesRead = 0;
+    private long markOffset = -1L;
+
+    public LimitedInputStream(final InputStream in, final LongSupplier limitSupplier) {
+        this.in = in;
+        this.limitSupplier = Objects.requireNonNull(limitSupplier);
+        this.limit = -1;
+    }
 
     public LimitedInputStream(final InputStream in, final long limit) {
         this.in = in;
         this.limit = limit;
+        this.limitSupplier = null;
+    }
+
+    private long getLimit() {
+        return limitSupplier == null ? limit : limitSupplier.getAsLong();
     }
 
     @Override
     public int read() throws IOException {
-        if (bytesRead >= limit) {
+        if (bytesRead >= getLimit()) {
             return -1;
         }
 
@@ -45,6 +60,7 @@ public class LimitedInputStream extends InputStream {
 
     @Override
     public int read(final byte[] b) throws IOException {
+        final long limit = getLimit();
         if (bytesRead >= limit) {
             return -1;
         }
@@ -60,6 +76,7 @@ public class LimitedInputStream extends InputStream {
 
     @Override
     public int read(byte[] b, int off, int len) throws IOException {
+        final long limit = getLimit();
         if (bytesRead >= limit) {
             return -1;
         }
@@ -75,14 +92,14 @@ public class LimitedInputStream extends InputStream {
 
     @Override
     public long skip(final long n) throws IOException {
-        final long skipped = in.skip(Math.min(n, limit - bytesRead));
+        final long skipped = in.skip(Math.min(n, getLimit() - bytesRead));
         bytesRead += skipped;
         return skipped;
     }
 
     @Override
     public int available() throws IOException {
-        return (int)(limit - bytesRead);
+        return (int)(getLimit() - bytesRead);
     }
 
     @Override
@@ -93,8 +110,7 @@ public class LimitedInputStream extends InputStream {
     @Override
     public void mark(int readlimit) {
         in.mark(readlimit);
-        limit -= bytesRead;
-        bytesRead = 0;
+        markOffset = bytesRead;
     }
 
     @Override
@@ -105,6 +121,10 @@ public class LimitedInputStream extends InputStream {
     @Override
     public void reset() throws IOException {
         in.reset();
-        bytesRead = 0;
+
+        if (markOffset >= 0) {
+            bytesRead = markOffset;
+        }
+        markOffset = -1;
     }
 }
index 3ecff71..bf1a579 100644 (file)
  */
 package org.apache.nifi.controller.repository;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.Logger;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.read.ListAppender;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
+import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.controller.repository.util.DiskUtils;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -45,24 +56,12 @@ import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.StandardContentClaim;
-import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
-import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
-import org.apache.nifi.controller.repository.util.DiskUtils;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.util.NiFiProperties;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.LoggerFactory;
-
-import ch.qos.logback.classic.Level;
-import ch.qos.logback.classic.Logger;
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.read.ListAppender;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeFalse;
 
 public class TestFileSystemRepository {
@@ -192,6 +191,28 @@ public class TestFileSystemRepository {
     }
 
     @Test
+    public void testReadClaimThenWriteThenReadMore() throws IOException {
+        final ContentClaim claim = repository.create(false);
+
+        final OutputStream out = repository.write(claim);
+        out.write("hello".getBytes());
+        out.flush();
+
+        final InputStream in = repository.read(claim);
+        final byte[] buffer = new byte[5];
+        StreamUtils.fillBuffer(in, buffer);
+
+        assertEquals("hello", new String(buffer));
+
+        out.write("good-bye".getBytes());
+        out.close();
+
+        final byte[] buffer2 = new byte[8];
+        StreamUtils.fillBuffer(in, buffer2);
+        assertEquals("good-bye", new String(buffer2));
+    }
+
+    @Test
     public void testClaimantCounts() throws IOException {
         final ContentClaim claim = repository.create(true);
         assertNotNull(claim);
index 7b1e64d..129fed6 100644 (file)
  */
 package org.apache.nifi.controller.repository.io;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.fail;
+import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 public class TestLimitedInputStream {
 
@@ -70,9 +70,11 @@ public class TestLimitedInputStream {
     @Test
     public void testSkip() throws Exception {
         final LimitedInputStream lis = new LimitedInputStream(bais, 4);
+        lis.mark(4);
         assertEquals(3, lis.read(buffer3));
         assertEquals(1, lis.skip(data.length));
         lis.reset();
+        lis.mark(4);
         assertEquals(4, lis.skip(7));
         lis.reset();
         assertEquals(2, lis.skip(2));
@@ -91,7 +93,7 @@ public class TestLimitedInputStream {
     @Test
     public void testAvailable() throws Exception {
         final LimitedInputStream lis = new LimitedInputStream(bais, 4);
-        assertNotEquals(data.length, lis.available());
+        assertEquals(4, lis.available());
         lis.reset();
         assertEquals(4, lis.available());
         assertEquals(1, lis.read(buffer3, 0, 1));
@@ -107,14 +109,15 @@ public class TestLimitedInputStream {
     @Test
     public void testMark() throws Exception {
         final LimitedInputStream lis = new LimitedInputStream(bais, 6);
+        lis.mark(1000);
         assertEquals(3, lis.read(buffer3));
         assertEquals(3, lis.read(buffer10));
         lis.reset();
-        assertEquals(3, lis.read(buffer3));
         lis.mark(1000);
+        assertEquals(3, lis.read(buffer3));
         assertEquals(3, lis.read(buffer10));
         lis.reset();
-        assertEquals(3, lis.read(buffer10));
+        assertEquals(6, lis.read(buffer10));
     }
 
 }