Fixed bug with continuance introed in #71 88/head
authorKeith Turner <kturner@apache.org>
Fri, 15 Jul 2016 20:43:50 +0000 (16:43 -0400)
committerKeith Turner <kturner@apache.org>
Fri, 15 Jul 2016 22:59:15 +0000 (18:59 -0400)
modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java
modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java
modules/core/src/test/java/org/apache/fluo/recipes/core/map/BigUpdateIT.java

index e09a376..2c41313 100644 (file)
@@ -125,10 +125,13 @@ public class ExportObserver<K, V> extends AbstractObserver {
 
     exporter.processExports(exportIterator);
 
-    if (input.hasNext()) {
-      // not everything was processed so notify self
+    if (input.hasNext() || continueRow != null) {
+      // not everything was processed so notify self OR new data may have been inserted above the
+      // continue row
       bucket.notifyExportObserver();
+    }
 
+    if (input.hasNext()) {
       if (!memLimitIter.hasNext()) {
         // stopped because of mem limit... set continue key
         bucket.setContinueRow(input.next());
index c6c0918..ccb250f 100644 (file)
@@ -125,8 +125,6 @@ public class CollisionFreeMap<K, V> {
       span = Span.prefix(ntfyRow);
     }
 
-    // TODO
-    span = Span.prefix(ntfyRow);
     Iterator<RowColumnValue> iter = tx.scanner().over(span).fetch(UPDATE_COL).build().iterator();
 
     Map<Bytes, List<Bytes>> updates = new HashMap<>();
@@ -134,6 +132,7 @@ public class CollisionFreeMap<K, V> {
     long approxMemUsed = 0;
 
     Bytes partiallyReadKey = null;
+    boolean setNextKey = false;
 
     if (iter.hasNext()) {
       Bytes lastKey = null;
@@ -178,8 +177,7 @@ public class CollisionFreeMap<K, V> {
           tx.set(ntfyRow, NEXT_COL, nextPossible);
         }
 
-        // may not read all data because of mem limit, so notify self
-        tx.setWeakNotification(ntfyRow, col);
+        setNextKey = true;
       } else if (nextKey != null) {
         // clear nextKey
         tx.delete(ntfyRow, NEXT_COL);
@@ -188,6 +186,14 @@ public class CollisionFreeMap<K, V> {
       tx.delete(ntfyRow, NEXT_COL);
     }
 
+    if (nextKey != null || setNextKey) {
+      // If not all data was read need to run again in the future. If scanning was started in the
+      // middle of the bucket, its possible there is new data before nextKey that still needs to be
+      // processed. If scanning stopped before reading the entire bucket there may be data after the
+      // stop point.
+      tx.setWeakNotification(ntfyRow, col);
+    }
+
     byte[] dataPrefix = ntfyRow.toArray();
     // TODO this is awful... no sanity check... hard to read
     dataPrefix[Bytes.of(mapId).length() + 1] = 'd';
index 852d117..66056c6 100644 (file)
@@ -177,7 +177,6 @@ public class BigUpdateIT {
   }
 
   private void checkUpdates(TypedSnapshot snap, long expectedVal, long expectedRows) {
-
     RowScanner rows = snap.scanner().over(Span.prefix("side:")).byRow().build();
 
     int row = 0;