Avoid allocating collection in AccumuloExporter 112/head
authorKeith Turner <kturner@apache.org>
Wed, 12 Oct 2016 19:26:47 +0000 (15:26 -0400)
committerKeith Turner <keith@deenlo.com>
Thu, 13 Oct 2016 20:59:32 +0000 (16:59 -0400)
docs/accumulo-export-queue.md
modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java
modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloReplicator.java
modules/accumulo/src/test/java/org/apache/fluo/recipes/accumulo/export/AccumuloExportTest.java
modules/test/src/test/java/org/apache/fluo/recipes/test/export/SimpleExporter.java

index 33a1c97..b880d58 100644 (file)
@@ -24,10 +24,10 @@ Exporting to Accumulo is easy. Follow the steps below:
     public class SimpleExporter extends AccumuloExporter<String, String> {
 
       @Override
-      protected Collection<Mutation> translate(SequencedExport<String, String> export) {
+      protected void translate(SequencedExport<String, String> export, Consumer<Mutation> consumer) {
         Mutation m = new Mutation(export.getKey());
         m.put("cf", "cq", export.getSequence(), export.getValue());
-        return Collections.singleton(m);
+        consumer.accept(m);
       }
     }
     ```
index 4cb0730..6a19362 100644 (file)
 package org.apache.fluo.recipes.accumulo.export;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.function.Consumer;
 
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.fluo.api.config.SimpleConfiguration;
@@ -68,9 +68,11 @@ public abstract class AccumuloExporter<K, V> extends Exporter<K, V> {
 
     ArrayList<Mutation> buffer = new ArrayList<>();
 
+    Consumer<Mutation> consumer = m -> buffer.add(m);
+
     while (exports.hasNext()) {
       SequencedExport<K, V> export = exports.next();
-      buffer.addAll(translate(export));
+      translate(export, consumer);
     }
 
     if (buffer.size() > 0) {
@@ -78,7 +80,14 @@ public abstract class AccumuloExporter<K, V> extends Exporter<K, V> {
     }
   }
 
-  protected abstract Collection<Mutation> translate(SequencedExport<K, V> export);
+  /**
+   * Implementations of this method should translate the given SequencedExport to 0 or more
+   * Mutations.
+   * 
+   * @param export the input that should be translated to mutations
+   * @param consumer output mutations to this consumer
+   */
+  protected abstract void translate(SequencedExport<K, V> export, Consumer<Mutation> consumer);
 
   /**
    * Generates Accumulo mutations by comparing the differences between a RowColumn/Bytes map that is
@@ -95,12 +104,13 @@ public abstract class AccumuloExporter<K, V> extends Exporter<K, V> {
    * <li>The export sequence number is used for the timestamp in the mutation.
    * </ul>
    *
+   * @param consumer generated mutations will be output to this consumer
    * @param oldData Map containing old row/column data
    * @param newData Map containing new row/column data
    * @param seq Export sequence number
    */
-  public static Collection<Mutation> generateMutations(long seq, Map<RowColumn, Bytes> oldData,
-      Map<RowColumn, Bytes> newData) {
+  public static void generateMutations(long seq, Map<RowColumn, Bytes> oldData,
+      Map<RowColumn, Bytes> newData, Consumer<Mutation> consumer) {
     Map<Bytes, Mutation> mutationMap = new HashMap<>();
     for (Map.Entry<RowColumn, Bytes> entry : oldData.entrySet()) {
       RowColumn rc = entry.getKey();
@@ -120,7 +130,7 @@ public abstract class AccumuloExporter<K, V> extends Exporter<K, V> {
         m.put(col.getFamily().toArray(), col.getQualifier().toArray(), seq, newVal.toArray());
       }
     }
-    return mutationMap.values();
-  }
 
+    mutationMap.values().forEach(consumer);
+  }
 }
index abaecfd..ed16fc7 100644 (file)
@@ -15,9 +15,9 @@
 
 package org.apache.fluo.recipes.accumulo.export;
 
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.function.Consumer;
 import java.util.function.Predicate;
 
 import org.apache.accumulo.core.data.Mutation;
@@ -34,8 +34,8 @@ import org.apache.fluo.recipes.core.transaction.TxLog;
 public class AccumuloReplicator extends AccumuloExporter<String, TxLog> {
 
   @Override
-  protected Collection<Mutation> translate(SequencedExport<String, TxLog> export) {
-    return generateMutations(export.getSequence(), export.getValue());
+  protected void translate(SequencedExport<String, TxLog> export, Consumer<Mutation> consumer) {
+    generateMutations(export.getSequence(), export.getValue(), consumer);
   }
 
   /**
@@ -51,9 +51,9 @@ public class AccumuloReplicator extends AccumuloExporter<String, TxLog> {
    *
    * @param txLog Transaction log
    * @param seq Export sequence number
-   * @return Collection of mutations
+   * @param consumer generated mutations will be output to this consumer
    */
-  public static Collection<Mutation> generateMutations(long seq, TxLog txLog) {
+  public static void generateMutations(long seq, TxLog txLog, Consumer<Mutation> consumer) {
     Map<Bytes, Mutation> mutationMap = new HashMap<>();
     for (LogEntry le : txLog.getLogEntries()) {
       LogEntry.Operation op = le.getOp();
@@ -78,6 +78,6 @@ public class AccumuloReplicator extends AccumuloExporter<String, TxLog> {
         }
       }
     }
-    return mutationMap.values();
+    mutationMap.values().forEach(consumer);
   }
 }
index d0ebaee..53d2013 100644 (file)
 
 package org.apache.fluo.recipes.accumulo.export;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Consumer;
 
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.fluo.api.data.Bytes;
@@ -43,9 +45,9 @@ public class AccumuloExportTest {
     return rcMap;
   }
 
-  public static Collection<Mutation> genMutations(String key, long seq, Optional<String> oldVal,
-      Optional<String> newVal) {
-    return AccumuloExporter.generateMutations(seq, genData(key, oldVal), genData(key, newVal));
+  public static void genMutations(String key, long seq, Optional<String> oldVal,
+      Optional<String> newVal, Consumer<Mutation> consumer) {
+    AccumuloExporter.generateMutations(seq, genData(key, oldVal), genData(key, newVal), consumer);
   }
 
   public static Mutation makePut(String key, String val, long seq) {
@@ -70,33 +72,40 @@ public class AccumuloExportTest {
 
   @Test
   public void testDifferenceExport() {
-    Collection<Mutation> mutations;
+    final Collection<Mutation> mutations = new ArrayList<>();
+    Consumer<Mutation> consumer = m -> mutations.add(m);
 
-    mutations = genMutations("k1", 1, Optional.empty(), Optional.of("a"));
+    genMutations("k1", 1, Optional.empty(), Optional.of("a"), consumer);
     Assert.assertEquals(1, mutations.size());
     Assert.assertTrue(mutations.contains(makePut("k1", "a", 1)));
+    mutations.clear();
 
-    mutations = genMutations("k2", 2, Optional.of("ab"), Optional.of("ab"));
+    genMutations("k2", 2, Optional.of("ab"), Optional.of("ab"), consumer);
     Assert.assertEquals(0, mutations.size());
+    mutations.clear();
 
-    mutations = genMutations("k2", 2, Optional.of("b"), Optional.of("ab"));
+    genMutations("k2", 2, Optional.of("b"), Optional.of("ab"), consumer);
     Assert.assertEquals(1, mutations.size());
     Assert.assertTrue(mutations.contains(makePut("k2", "a", 2)));
+    mutations.clear();
 
-    mutations = genMutations("k3", 3, Optional.of("c"), Optional.of("d"));
+    genMutations("k3", 3, Optional.of("c"), Optional.of("d"), consumer);
     Assert.assertEquals(1, mutations.size());
     Mutation m = makeDel("k3", "c", 3);
     addPut(m, "k3", "d", 3);
     Assert.assertTrue(mutations.contains(m));
+    mutations.clear();
 
-    mutations = genMutations("k4", 4, Optional.of("e"), Optional.empty());
+    genMutations("k4", 4, Optional.of("e"), Optional.empty(), consumer);
     Assert.assertEquals(1, mutations.size());
     Assert.assertTrue(mutations.contains(makeDel("k4", "e", 4)));
+    mutations.clear();
 
-    mutations = genMutations("k5", 5, Optional.of("ef"), Optional.of("fg"));
+    genMutations("k5", 5, Optional.of("ef"), Optional.of("fg"), consumer);
     Assert.assertEquals(1, mutations.size());
     m = makeDel("k5", "e", 5);
     addPut(m, "k5", "g", 5);
     Assert.assertTrue(mutations.contains(m));
+    mutations.clear();
   }
 }
index 14c1aa4..d48857e 100644 (file)
@@ -15,8 +15,7 @@
 
 package org.apache.fluo.recipes.test.export;
 
-import java.util.Collection;
-import java.util.Collections;
+import java.util.function.Consumer;
 
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.fluo.recipes.accumulo.export.AccumuloExporter;
@@ -25,9 +24,9 @@ import org.apache.fluo.recipes.core.export.SequencedExport;
 public class SimpleExporter extends AccumuloExporter<String, String> {
 
   @Override
-  protected Collection<Mutation> translate(SequencedExport<String, String> export) {
+  protected void translate(SequencedExport<String, String> export, Consumer<Mutation> consumer) {
     Mutation m = new Mutation(export.getKey());
     m.put("cf", "cq", export.getSequence(), export.getValue());
-    return Collections.singleton(m);
+    consumer.accept(m);
   }
 }