Topology reconfiguration fixes trunk
authorBlake Eggleston <bdeggleston@gmail.com>
Thu, 10 Mar 2022 20:46:17 +0000 (12:46 -0800)
committerBlake Eggleston <blake@ultrablake.com>
Mon, 11 Apr 2022 16:17:26 +0000 (09:17 -0700)
Patch by Blake Eggleston; Reveiewed by Benedict Elliott Smith

47 files changed:
accord-core/src/main/java/accord/api/ConfigurationService.java
accord-core/src/main/java/accord/coordinate/Agree.java
accord-core/src/main/java/accord/coordinate/Coordinate.java
accord-core/src/main/java/accord/coordinate/tracking/AbstractResponseTracker.java
accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
accord-core/src/main/java/accord/local/Command.java
accord-core/src/main/java/accord/local/CommandStore.java
accord-core/src/main/java/accord/local/CommandStores.java
accord-core/src/main/java/accord/local/CommandsForKey.java
accord-core/src/main/java/accord/local/Node.java
accord-core/src/main/java/accord/messages/Accept.java
accord-core/src/main/java/accord/messages/Apply.java
accord-core/src/main/java/accord/messages/BeginRecovery.java
accord-core/src/main/java/accord/messages/Commit.java
accord-core/src/main/java/accord/messages/PreAccept.java
accord-core/src/main/java/accord/messages/ReadData.java
accord-core/src/main/java/accord/messages/TxnRequest.java
accord-core/src/main/java/accord/messages/WaitOnCommit.java
accord-core/src/main/java/accord/topology/KeyRanges.java
accord-core/src/main/java/accord/topology/Topology.java
accord-core/src/main/java/accord/topology/TopologyManager.java
accord-core/src/main/java/accord/txn/Keys.java
accord-core/src/main/java/accord/txn/Txn.java
accord-core/src/main/java/accord/txn/Writes.java
accord-core/src/main/java/accord/utils/DeterministicIdentitySet.java
accord-core/src/main/java/accord/utils/ThreadPoolScheduler.java
accord-core/src/test/java/accord/KeysTest.java
accord-core/src/test/java/accord/burn/BurnTest.java
accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
accord-core/src/test/java/accord/burn/TopologyUpdate.java
accord-core/src/test/java/accord/coordinate/RecoverTest.java
accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
accord-core/src/test/java/accord/impl/basic/Cluster.java
accord-core/src/test/java/accord/impl/list/ListData.java
accord-core/src/test/java/accord/impl/list/ListWrite.java
accord-core/src/test/java/accord/impl/mock/EpochSync.java
accord-core/src/test/java/accord/impl/mock/MockCluster.java
accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java
accord-core/src/test/java/accord/messages/PreAcceptTest.java
accord-core/src/test/java/accord/messages/TxnRequestScopeTest.java
accord-core/src/test/java/accord/topology/TopologyRandomizer.java
accord-core/src/test/java/accord/utils/MessageTask.java
accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
accord-maelstrom/src/main/java/accord/maelstrom/MaelstromData.java
accord-maelstrom/src/main/java/accord/maelstrom/MaelstromReplyContext.java
accord-maelstrom/src/main/java/accord/maelstrom/Main.java
accord-maelstrom/src/main/java/accord/maelstrom/SimpleConfigService.java

index 10896446e88af52dd995dae79e815d9da72e1f84..7f5bf8850a5574d4b841cc66e34c6e433c7f3687 100644 (file)
@@ -68,11 +68,10 @@ public interface ConfigurationService
     Topology getTopologyForEpoch(long epoch);
 
     /**
-     * Method for reporting epochs the configuration service may not be aware of, and optionally running a supplied
-     * runnable once the corresponding topology has been received and applied. If the configuration service is already
-     * aware of the reported epoch, the runnable should be run immediately.
+     * Method for reporting epochs the configuration service may not be aware of. To be notified when the new epoch
+     * is available locally, use {@link accord.topology.TopologyManager#awaitEpoch(long)}
      */
-    Future<Void> fetchTopologyForEpoch(long epoch);
+    void fetchTopologyForEpoch(long epoch);
 
     /**
      * Alert the configuration service of epochs it may not be aware of. This is called called for every TxnRequest
index 8a3777cc0c087749eac001546b3583e8ac8ba8ee..eb91821ee65ad1e67d36aead15f64e91a0be4080 100644 (file)
@@ -54,7 +54,7 @@ class Agree extends AcceptPhase implements Callback<PreAcceptReply>
         volatile long supersedingEpoch = -1;
         private final boolean fastPathPermitted;
         private final Set<Id> successes = new HashSet<>();
-        private final Set<Id> failures = new HashSet<>();
+        private Set<Id> failures;
 
         public PreacceptTracker(Topologies topologies, boolean fastPathPermitted)
         {
@@ -70,6 +70,8 @@ class Agree extends AcceptPhase implements Callback<PreAcceptReply>
         @Override
         public void recordFailure(Id node)
         {
+            if (failures == null)
+                failures = new HashSet<>();
             failures.add(node);
             super.recordFailure(node);
         }
@@ -103,7 +105,8 @@ class Agree extends AcceptPhase implements Callback<PreAcceptReply>
         {
             PreacceptTracker tracker = new PreacceptTracker(topologies, false);
             successes.forEach(tracker::recordSuccess);
-            failures.forEach(tracker::recordFailure);
+            if (failures != null)
+                failures.forEach(tracker::recordFailure);
             return tracker;
         }
 
@@ -112,6 +115,11 @@ class Agree extends AcceptPhase implements Callback<PreAcceptReply>
         {
             return fastPathPermitted && super.hasMetFastPathCriteria();
         }
+
+        boolean shouldSlowPathAccept()
+        {
+            return (!fastPathPermitted || !hasInFlight()) && hasReachedQuorum();
+        }
     }
 
     final Keys keys;
@@ -153,7 +161,7 @@ class Agree extends AcceptPhase implements Callback<PreAcceptReply>
             tryFailure(new Timeout());
 
         // if no other responses are expected and the slow quorum has been satisfied, proceed
-        if (shouldSlowPathAccept())
+        if (tracker.shouldSlowPathAccept())
             onPreAccepted();
     }
 
@@ -172,7 +180,7 @@ class Agree extends AcceptPhase implements Callback<PreAcceptReply>
         if (!needMessages.isEmpty())
             node.send(needMessages, to -> new PreAccept(to, newTopologies, txnId, txn), this);
 
-        if (shouldSlowPathAccept())
+        if (tracker.shouldSlowPathAccept())
             onPreAccepted();
     }
 
@@ -197,10 +205,13 @@ class Agree extends AcceptPhase implements Callback<PreAcceptReply>
         if (!fastPath && ok.witnessedAt.epoch > txnId.epoch)
         {
             if (tracker.recordSupersedingEpoch(ok.witnessedAt.epoch))
-                node.configService().fetchTopologyForEpoch(ok.witnessedAt.epoch).addListener(this::onEpochUpdate);
+            {
+                node.configService().fetchTopologyForEpoch(ok.witnessedAt.epoch);
+                node.topology().awaitEpoch(ok.witnessedAt.epoch).addListener(this::onEpochUpdate);
+            }
         }
 
-        if (!tracker.hasSupersedingEpoch() && (tracker.hasMetFastPathCriteria() || shouldSlowPathAccept()))
+        if (!tracker.hasSupersedingEpoch() && (tracker.hasMetFastPathCriteria() || tracker.shouldSlowPathAccept()))
             onPreAccepted();
     }
 
@@ -236,11 +247,6 @@ class Agree extends AcceptPhase implements Callback<PreAcceptReply>
         }
     }
 
-    private boolean shouldSlowPathAccept()
-    {
-        return (!tracker.fastPathPermitted || !tracker.hasInFlight()) && tracker.hasReachedQuorum();
-    }
-
     private boolean isPreAccepted()
     {
         return preacceptOutcome != null;
index b54d12471d15dce87c72e89998f61f632f43c2e3..c68d5babd6b541a422daea36cc81d61adc3ecd38 100644 (file)
@@ -14,9 +14,11 @@ public class Coordinate
     {
         long executeEpoch = agreed.executeAt.epoch;
         ConfigurationService configService = node.configService();
-        if (executeEpoch > configService.currentEpoch())
-            return configService.fetchTopologyForEpoch(executeEpoch)
-                                .flatMap(v -> fetchEpochOrExecute(node, agreed));
+        if (executeEpoch > node.topology().epoch())
+        {
+            configService.fetchTopologyForEpoch(executeEpoch);
+            return node.topology().awaitEpoch(executeEpoch).flatMap(v -> fetchEpochOrExecute(node, agreed));
+        }
 
         return Execute.execute(node, agreed);
     }
index 9dc62681e83f28b042b479e18a5ff24e97cf7999..942cde78275d77b255c186b3c7df00df4a10c09e 100644 (file)
@@ -112,11 +112,11 @@ public abstract class AbstractResponseTracker<T extends AbstractResponseTracker.
         return false;
     }
 
-    protected <V> V accumulate(BiFunction<T, V, V> function, V start)
+    protected <V> V foldl(BiFunction<T, V, V> function, V accumulator)
     {
         for (T tracker : trackers)
-            start = function.apply(tracker, start);
-        return start;
+            accumulator = function.apply(tracker, accumulator);
+        return accumulator;
     }
 
     public Set<Node.Id> nodes()
index 100bdd33d1df97db5f540b518de28147f94a58fe..3c526a2d1e3b0a114b236a9ac7a0fb2d9e4c41cb 100644 (file)
@@ -108,7 +108,7 @@ public class ReadTracker extends AbstractResponseTracker<ReadTracker.ReadShardTr
      */
     public Set<Id> computeMinimalReadSetAndMarkInflight()
     {
-        Set<ReadShardTracker> toRead = accumulate((tracker, accumulate) -> {
+        Set<ReadShardTracker> toRead = foldl((tracker, accumulate) -> {
             if (!tracker.shouldRead())
                 return accumulate;
 
index a520b600fc1ad1a2fc6d843c8a191eb6bdb007a8..1e3e1f8810d7d8c430ff5e0382e5f1c5c58ef7e7 100644 (file)
@@ -354,10 +354,10 @@ public class Command implements Listener, Consumer<Listener>
     {
         return "Command{" +
                "txnId=" + txnId +
+               ", status=" + status +
                ", txn=" + txn +
                ", executeAt=" + executeAt +
                ", deps=" + deps +
-               ", status=" + status +
                '}';
     }
 }
index 3241cd7f59cc734f12bee2fe8970edc4f4c0230f..ae1c1535e811b0b1383a45d1d7e69a5798607215 100644 (file)
@@ -4,12 +4,13 @@ import accord.api.Agent;
 import accord.api.Key;
 import accord.api.KeyRange;
 import accord.api.Store;
+import accord.local.CommandStores.StoreGroup;
+import accord.local.Node.Id;
 import accord.topology.KeyRanges;
 import accord.topology.Topology;
 import accord.txn.Keys;
 import accord.txn.Timestamp;
 import accord.txn.TxnId;
-import com.google.common.base.Preconditions;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.Promise;
@@ -21,6 +22,9 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
+import com.google.common.base.Preconditions;
+import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
+
 /**
  * Single threaded internal shard of accord transaction metadata
  */
@@ -37,9 +41,6 @@ public abstract class CommandStore
                             Store store,
                             KeyRanges ranges,
                             Supplier<Topology> localTopologySupplier);
-        Factory SYNCHRONIZED = Synchronized::new;
-        Factory SINGLE_THREAD = SingleThread::new;
-        Factory SINGLE_THREAD_DEBUG = SingleThreadDebug::new;
     }
 
     private final int generation;
@@ -126,27 +127,6 @@ public abstract class CommandStore
         return ranges;
     }
 
-    void purgeRanges(KeyRanges removed)
-    {
-        for (KeyRange range : removed)
-        {
-            NavigableMap<Key, CommandsForKey> subMap = commandsForKey.subMap(range.start(), range.startInclusive(), range.end(), range.endInclusive());
-            Iterator<Key> keyIterator = subMap.keySet().iterator();
-            while (keyIterator.hasNext())
-            {
-                Key key = keyIterator.next();
-                CommandsForKey forKey = commandsForKey.get(key);
-                if (forKey != null)
-                {
-                    for (Command command : forKey)
-                        if (command.txn() != null && !ranges.intersects(command.txn().keys))
-                            commands.remove(command.txnId());
-                }
-                keyIterator.remove();
-            }
-        }
-    }
-
     public void forEpochCommands(KeyRanges ranges, long epoch, Consumer<Command> consumer)
     {
         Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE, Integer.MIN_VALUE, Node.Id.NONE);
@@ -193,7 +173,7 @@ public abstract class CommandStore
 
     public boolean hashIntersects(Key key)
     {
-        return CommandStores.keyIndex(key, numShards) == index;
+        return StoreGroup.keyIndex(key, numShards) == index;
     }
 
     public boolean intersects(Keys keys)
@@ -201,6 +181,11 @@ public abstract class CommandStore
         return keys.any(ranges, this::hashIntersects);
     }
 
+    public boolean contains(Key key)
+    {
+        return ranges.contains(key);
+    }
+
     public static void onEach(Collection<CommandStore> stores, Consumer<? super CommandStore> consumer)
     {
         for (CommandStore store : stores)
@@ -234,6 +219,8 @@ public abstract class CommandStore
 
     public abstract Future<Void> process(Consumer<? super CommandStore> consumer);
 
+    public abstract <T> Future<T> process(Function<? super CommandStore, T> function);
+
     public void processBlocking(Consumer<? super CommandStore> consumer)
     {
         try
@@ -242,7 +229,7 @@ public abstract class CommandStore
         }
         catch (InterruptedException e)
         {
-            Thread.currentThread().interrupt();
+            throw new UncheckedInterruptedException(e);
         }
         catch (ExecutionException e)
         {
@@ -254,15 +241,10 @@ public abstract class CommandStore
 
     public static class Synchronized extends CommandStore
     {
-        public Synchronized(int generation,
-                            int index,
-                            int numShards,
-                            Node.Id nodeId,
-                            Function<Timestamp, Timestamp> uniqueNow,
-                            Agent agent,
-                            Store store,
-                            KeyRanges ranges,
-                            Supplier<Topology> localTopologySupplier)
+        public Synchronized(int generation, int index, int numShards, Node.Id nodeId,
+                                        Function<Timestamp, Timestamp> uniqueNow,
+                                        Agent agent, Store store,
+                                        KeyRanges ranges, Supplier<Topology> localTopologySupplier)
         {
             super(generation, index, numShards, nodeId, uniqueNow, agent, store, ranges, localTopologySupplier);
         }
@@ -275,6 +257,14 @@ public abstract class CommandStore
             return promise;
         }
 
+        @Override
+        public <T> Future<T> process(Function<? super CommandStore, T> function)
+        {
+            AsyncPromise<T> promise = new AsyncPromise<>();
+            processInternal(function, promise);
+            return promise;
+        }
+
         @Override
         public void shutdown() {}
     }
@@ -299,15 +289,26 @@ public abstract class CommandStore
             }
         }
 
-        public SingleThread(int generation,
-                            int index,
-                            int numShards,
-                            Node.Id nodeId,
-                            Function<Timestamp, Timestamp> uniqueNow,
-                            Agent agent,
-                            Store store,
-                            KeyRanges ranges,
-                            Supplier<Topology> localTopologySupplier)
+        private class FunctionWrapper<T> extends AsyncPromise<T> implements Runnable
+        {
+            private final Function<? super CommandStore, T> function;
+
+            public FunctionWrapper(Function<? super CommandStore, T> function)
+            {
+                this.function = function;
+            }
+
+            @Override
+            public void run()
+            {
+                processInternal(function, this);
+            }
+        }
+
+        public SingleThread(int generation, int index, int numShards, Node.Id nodeId,
+                                        Function<Timestamp, Timestamp> uniqueNow,
+                                        Agent agent, Store store,
+                                        KeyRanges ranges, Supplier<Topology> localTopologySupplier)
         {
             super(generation, index, numShards, nodeId, uniqueNow, agent, store, ranges, localTopologySupplier);
             executor = Executors.newSingleThreadExecutor(r -> {
@@ -325,6 +326,14 @@ public abstract class CommandStore
             return future;
         }
 
+        @Override
+        public <T> Future<T> process(Function<? super CommandStore, T> function)
+        {
+            FunctionWrapper<T> future = new FunctionWrapper<>(function);
+            executor.execute(future);
+            return future;
+        }
+
         @Override
         public void shutdown()
         {
@@ -332,19 +341,14 @@ public abstract class CommandStore
         }
     }
 
-    public static class SingleThreadDebug extends SingleThread
+    static class Debug extends SingleThread
     {
         private final AtomicReference<Thread> expectedThread = new AtomicReference<>();
 
-        public SingleThreadDebug(int generation,
-                                 int index,
-                                 int numShards,
-                                 Node.Id nodeId,
-                                 Function<Timestamp, Timestamp> uniqueNow,
-                                 Agent agent,
-                                 Store store,
-                                 KeyRanges ranges,
-                                 Supplier<Topology> localTopologySupplier)
+        public Debug(int generation, int index, int numShards, Node.Id nodeId,
+                     Function<Timestamp, Timestamp> uniqueNow,
+                     Agent agent, Store store,
+                     KeyRanges ranges, Supplier<Topology> localTopologySupplier)
         {
             super(generation, index, numShards, nodeId, uniqueNow, agent, store, ranges, localTopologySupplier);
         }
@@ -405,4 +409,5 @@ public abstract class CommandStore
             super.processInternal(consumer, future);
         }
     }
+
 }
index 8e5886c00a38df3c58c665aab7d74a5b95c3f2f1..68efb685b8a5ff22ccd958692825679a6f96181e 100644 (file)
@@ -3,25 +3,36 @@ package accord.local;
 import accord.api.Agent;
 import accord.api.Key;
 import accord.api.Store;
+import accord.local.CommandStore.SingleThread;
+import accord.local.CommandStores.StoreGroups.Fold;
+import accord.local.Node.Id;
 import accord.messages.TxnRequest;
 import accord.topology.KeyRanges;
 import accord.topology.Topology;
 import accord.txn.Keys;
 import accord.txn.Timestamp;
-import com.google.common.base.Preconditions;
 import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
 import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.function.*;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
+
+import static java.lang.Boolean.FALSE;
 
 /**
  * Manages the single threaded metadata shards
  */
-public class CommandStores
+public abstract class CommandStores
 {
+    public interface Factory
+    {
+        CommandStores create(int num, Node.Id node, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store);
+    }
+
     static class StoreGroup
     {
         final CommandStore[] stores;
@@ -29,85 +40,53 @@ public class CommandStores
 
         public StoreGroup(CommandStore[] stores, KeyRanges ranges)
         {
+            Preconditions.checkArgument(stores.length <= 64);
             this.stores = stores;
             this.ranges = ranges;
         }
 
-        private static class AccumulatingBitset extends BitSet implements Keys.KeyAccumulator<Keys>
+        long all()
         {
-            final int numStores;
-            boolean isDone = false;
-            public AccumulatingBitset(int numStores)
-            {
-                super(numStores);
-                this.numStores = numStores;
-            }
-
-            @Override
-            public Keys accumulate(Key key, Keys keys)
-            {
-                int idx = keyIndex(key, numStores);
-                if (get(idx))
-                    return keys;
-                set(idx);
-                isDone = cardinality() == numStores;
-                return keys;
-            }
-
-            @Override
-            public boolean isDone()
-            {
-                return isDone;
-            }
+            return -1L >>> (64 - stores.length);
         }
 
-        public Stream<CommandStore> stream()
+        long matches(Keys keys)
         {
-            return StreamSupport.stream(new ShardSpliterator(stores), false);
+            return keys.foldl(ranges, StoreGroup::addKeyIndex, stores.length, 0L, -1L);
         }
 
-        public Stream<CommandStore> stream(Keys keys)
+        long matches(TxnRequest.Scope scope)
         {
-            AccumulatingBitset bitSet = new AccumulatingBitset(stores.length);
-            keys.accumulate(ranges, bitSet, keys);
-            if (bitSet.cardinality() == 0)
-                return null;
-            return StreamSupport.stream(new ShardSpliterator(stores, bitSet::get), false);
+            return matches(scope.keys());
         }
 
-        public Stream<CommandStore> stream(TxnRequest.Scope scope)
+        static long keyIndex(Key key, long numShards)
         {
-            AccumulatingBitset bitSet = new AccumulatingBitset(stores.length);
-            for (int i=0, mi=scope.size(); i<mi; i++)
-            {
-                Keys keys = scope.get(i).keys;
-                keys.accumulate(ranges, bitSet, keys);
-                if (bitSet.isDone)
-                    break;
-            }
-            if (bitSet.cardinality() == 0)
-                return null;
-            return StreamSupport.stream(new ShardSpliterator(stores, bitSet::get), false);
+            return Integer.toUnsignedLong(key.keyHash()) % numShards;
         }
-    }
 
-    static int keyIndex(Key key, int numShards)
-    {
-        return (int) (Integer.toUnsignedLong(key.keyHash()) % numShards);
+        private static long addKeyIndex(Key key, long numShards, long accumulate)
+        {
+            return accumulate | (1L << keyIndex(key, numShards));
+        }
     }
 
     static class StoreGroups
     {
-        static final StoreGroups EMPTY = new StoreGroups(new StoreGroup[0], Topology.EMPTY, Topology.EMPTY);
         final StoreGroup[] groups;
         final Topology global;
         final Topology local;
+        final int size;
 
         public StoreGroups(StoreGroup[] groups, Topology global, Topology local)
         {
             this.groups = groups;
             this.global = global;
             this.local = local;
+            int size = 0;
+            for (StoreGroup group : groups)
+                size += group.stores.length;
+            this.size = size;
         }
 
         StoreGroups withNewTopology(Topology global, Topology local)
@@ -115,43 +94,79 @@ public class CommandStores
             return new StoreGroups(groups, global, local);
         }
 
-        public Stream<CommandStore> stream()
+        public int size()
         {
-            Stream<CommandStore> stream = null;
-            for (StoreGroup group : groups)
-            {
-                Stream<CommandStore> nextStream = group.stream();
-                if (nextStream == null) continue;
-                stream = stream != null ? Stream.concat(stream, nextStream) : nextStream;
-            }
-
-            return stream != null ? stream : Stream.empty();
+            return size;
         }
 
-        public Stream<CommandStore> stream(Keys keys)
+        private <I1, I2, O> O foldl(int startGroup, long bitset, Fold<? super I1, ? super I2, O> fold, I1 param1, I2 param2, O accumulator)
         {
-            Stream<CommandStore> stream = null;
-            for (StoreGroup group : groups)
+            int groupIndex = startGroup;
+            StoreGroup group = groups[groupIndex];
+            int offset = 0;
+            while (true)
             {
-                Stream<CommandStore> nextStream = group.stream(keys);
-                if (nextStream == null) continue;
-                stream = stream != null ? Stream.concat(stream, nextStream) : nextStream;
+                int i = Long.numberOfTrailingZeros(bitset) - offset;
+                while (i < group.stores.length)
+                {
+                    accumulator = fold.fold(group.stores[i], param1, param2, accumulator);
+                    bitset ^= Long.lowestOneBit(bitset);
+                    i = Long.numberOfTrailingZeros(bitset) - offset;
+                }
+
+                if (++groupIndex == groups.length)
+                    break;
+
+                if (bitset == 0)
+                    break;
+
+                offset += group.stores.length;
+                group = groups[groupIndex];
+                if (offset + group.stores.length > 64)
+                    break;
             }
+            return accumulator;
+        }
 
-            return stream != null ? stream : Stream.empty();
+        interface Fold<I1, I2, O>
+        {
+            O fold(CommandStore store, I1 i1, I2 i2, O accumulator);
         }
 
-        public Stream<CommandStore> stream(TxnRequest.Scope scope)
+        <S, I1, I2, O> O foldl(ToLongBiFunction<StoreGroup, S> select, S scope, Fold<? super I1, ? super I2, O> fold, I1 param1, I2 param2, IntFunction<? extends O> factory)
         {
-            Stream<CommandStore> stream = null;
-            for (StoreGroup group : groups)
+            O accumulator = null;
+            int startGroup = 0;
+            while (startGroup < groups.length)
             {
-                Stream<CommandStore> nextStream = group.stream(scope);
-                if (nextStream == null) continue;
-                stream = stream != null ? Stream.concat(stream, nextStream) : nextStream;
+                long bits = select.applyAsLong(groups[startGroup], scope);
+                if (bits == 0)
+                {
+                    ++startGroup;
+                    continue;
+                }
+
+                int offset = groups[startGroup].stores.length;
+                int endGroup = startGroup + 1;
+                while (endGroup < groups.length)
+                {
+                    StoreGroup group = groups[endGroup];
+                    if (offset + group.stores.length > 64)
+                        break;
+
+                    bits += select.applyAsLong(group, scope) << offset;
+                    offset += group.stores.length;
+                    ++endGroup;
+                }
+
+                if (accumulator == null)
+                    accumulator = factory.apply(Long.bitCount(bits));
+
+                accumulator = foldl(startGroup, bits, fold, param1, param2, accumulator);
+                startGroup = endGroup;
             }
 
-            return stream != null ? stream : Stream.empty();
+            return accumulator;
         }
     }
 
@@ -161,7 +176,7 @@ public class CommandStores
     private final Store store;
     private final CommandStore.Factory shardFactory;
     private final int numShards;
-    private volatile StoreGroups groups = StoreGroups.EMPTY;
+    protected volatile StoreGroups groups = new StoreGroups(new StoreGroup[0], Topology.EMPTY, Topology.EMPTY);
 
     public CommandStores(int num, Node.Id node, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store, CommandStore.Factory shardFactory)
     {
@@ -190,19 +205,43 @@ public class CommandStores
                 commandStore.shutdown();
     }
 
-    public Stream<CommandStore> stream()
+    protected abstract <S> void forEach(ToLongBiFunction<StoreGroup, S> select, S scope, Consumer<? super CommandStore> forEach);
+    protected abstract <S, T> T mapReduce(ToLongBiFunction<StoreGroup, S> select, S scope, Function<? super CommandStore, T> map, BiFunction<T, T, T> reduce);
+
+    public void forEach(Consumer<CommandStore> forEach)
+    {
+        forEach((s, i) -> s.all(), null, forEach);
+    }
+
+    public void forEach(Keys keys, Consumer<CommandStore> forEach)
+    {
+        forEach(StoreGroup::matches, keys, forEach);
+    }
+
+    public void forEach(TxnRequest.Scope scope, Consumer<CommandStore> forEach)
     {
-        return groups.stream();
+        forEach(StoreGroup::matches, scope, forEach);
     }
 
-    public Stream<CommandStore> forKeys(Keys keys)
+    public <T> T mapReduce(TxnRequest.Scope scope, Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
     {
-        return groups.stream(keys);
+        return mapReduce(StoreGroup::matches, scope, map, reduce);
     }
 
-    public Stream<CommandStore> forScope(TxnRequest.Scope scope)
+    public <T extends Collection<CommandStore>> T collect(Keys keys, IntFunction<T> factory)
     {
-        return groups.stream(scope);
+        return groups.foldl(StoreGroup::matches, keys, CommandStores::append, null, null, factory);
+    }
+
+    public <T extends Collection<CommandStore>> T collect(TxnRequest.Scope scope, IntFunction<T> factory)
+    {
+        return groups.foldl(StoreGroup::matches, scope, CommandStores::append, null, null, factory);
+    }
+
+    private static <T extends Collection<CommandStore>> T append(CommandStore store, Object ignore1, Object ignore2, T to)
+    {
+        to.add(store);
+        return to;
     }
 
     public synchronized void updateTopology(Topology cluster)
@@ -214,8 +253,15 @@ public class CommandStores
             return;
 
         Topology local = cluster.forNode(node);
-        KeyRanges currentRanges = Arrays.stream(current.groups).map(group -> group.ranges).reduce(KeyRanges.EMPTY, (l, r) -> l.union(r)).mergeTouching();
-        KeyRanges added = local.ranges().difference(currentRanges);
+        KeyRanges added = local.ranges().difference(current.local.ranges());
+
+        for (StoreGroup group : groups.groups)
+        {
+            // FIXME: remove this (and the corresponding check in TopologyRandomizer) once lower bounds are implemented.
+            //  In the meantime, the logic needed to support acquiring ranges that we previously replicated is pretty
+            //  convoluted without the ability to jettison epochs.
+            Preconditions.checkState(!group.ranges.intersects(added));
+        }
 
         if (added.isEmpty())
         {
@@ -236,90 +282,97 @@ public class CommandStores
         groups = new StoreGroups(newGroups, cluster, local);
     }
 
-    private static class ShardSpliterator implements Spliterator<CommandStore>
+    @VisibleForTesting
+    public CommandStore unsafeForKey(Key key)
     {
-        int i = 0;
-        final CommandStore[] commandStores;
-        final IntPredicate predicate;
-
-        public ShardSpliterator(CommandStore[] commandStores, IntPredicate predicate)
+        for (StoreGroup group : groups.groups)
         {
-            this.commandStores = commandStores;
-            this.predicate = predicate;
+            if (group.ranges.contains(key))
+            {
+                for (CommandStore store : group.stores)
+                {
+                    if (store.hashIntersects(key))
+                        return store;
+                }
+            }
         }
+        throw new IllegalArgumentException();
+    }
 
-        public ShardSpliterator(CommandStore[] commandStores)
+    public static class Synchronized extends CommandStores
+    {
+        public Synchronized(int num, Id node, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store)
         {
-            this (commandStores, i -> true);
+            super(num, node, uniqueNow, agent, store, CommandStore.Synchronized::new);
         }
 
         @Override
-        public boolean tryAdvance(Consumer<? super CommandStore> action)
+        protected <S, T> T mapReduce(ToLongBiFunction<StoreGroup, S> select, S scope, Function<? super CommandStore, T> map, BiFunction<T, T, T> reduce)
         {
-            while (i < commandStores.length)
-            {
-                int idx = i++;
-                if (!predicate.test(idx))
-                    continue;
-                try
-                {
-                    commandStores[idx].process(action).get();
-                    break;
-                }
-                catch (InterruptedException | ExecutionException e)
-                {
-                    throw new RuntimeException(e);
-                }
-
-            }
-            return i < commandStores.length;
+            return groups.foldl(select, scope, (store, f, r, t) -> t == null ? f.apply(store) : r.apply(t, f.apply(store)), map, reduce, ignore -> null);
         }
 
         @Override
-        public void forEachRemaining(Consumer<? super CommandStore> action)
+        protected <S> void forEach(ToLongBiFunction<StoreGroup, S> select, S scope, Consumer<? super CommandStore> forEach)
         {
-            if (i >= commandStores.length)
-                return;
+            groups.foldl(select, scope, (store, f, r, t) -> { f.accept(store); return null; }, forEach, null, ignore -> FALSE);
+        }
+    }
 
-            List<Future<Void>> futures = new ArrayList<>(commandStores.length - i);
-            for (; i< commandStores.length; i++)
-            {
-                if (predicate.test(i))
-                    futures.add(commandStores[i].process(action));
-            }
+    public static class SingleThread extends CommandStores
+    {
+        public SingleThread(int num, Id node, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store)
+        {
+            this(num, node, uniqueNow, agent, store, CommandStore.SingleThread::new);
+        }
 
-            try
-            {
-                for (int i=0, mi=futures.size(); i<mi; i++)
-                    futures.get(i).get();
-            }
-            catch (InterruptedException e)
-            {
-                throw new RuntimeException(e);
-            }
-            catch (ExecutionException e)
+        public SingleThread(int num, Node.Id node, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store, CommandStore.Factory shardFactory)
+        {
+            super(num, node, uniqueNow, agent, store, shardFactory);
+        }
+
+        private <S, F, T> T mapReduce(ToLongBiFunction<StoreGroup, S> select, S scope, F f, Fold<F, ?, List<Future<T>>> fold, BiFunction<T, T, T> reduce)
+        {
+            List<Future<T>> futures = groups.foldl(select, scope, fold, f, null, ArrayList::new);
+            T result = null;
+            for (Future<T> future : futures)
             {
-                Throwable cause = e.getCause();
-                throw new RuntimeException(cause != null ? cause : e);
+                try
+                {
+                    T next = future.get();
+                    if (result == null) result = next;
+                    else result = reduce.apply(result, next);
+                }
+                catch (InterruptedException e)
+                {
+                    throw new UncheckedInterruptedException(e);
+                }
+                catch (ExecutionException e)
+                {
+                    throw new RuntimeException(e.getCause());
+                }
             }
+            return result;
         }
 
         @Override
-        public Spliterator<CommandStore> trySplit()
+        protected <S, T> T mapReduce(ToLongBiFunction<StoreGroup, S> select, S scope, Function<? super CommandStore, T> map, BiFunction<T, T, T> reduce)
         {
-            return null;
+            return mapReduce(select, scope, map, (store, f, i, t) -> { t.add(store.process(f)); return t; }, reduce);
         }
 
-        @Override
-        public long estimateSize()
+        protected <S> void forEach(ToLongBiFunction<StoreGroup, S> select, S scope, Consumer<? super CommandStore> forEach)
         {
-            return commandStores.length;
+            mapReduce(select, scope, forEach, (store, f, i, t) -> { t.add(store.process(f)); return t; }, (Void i1, Void i2) -> null);
         }
+    }
 
-        @Override
-        public int characteristics()
+    public static class Debug extends SingleThread
+    {
+        public Debug(int num, Id node, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store)
         {
-            return Spliterator.SIZED | Spliterator.NONNULL | Spliterator.DISTINCT | Spliterator.IMMUTABLE;
+            super(num, node, uniqueNow, agent, store, CommandStore.Debug::new);
         }
     }
+
 }
index 20fd1e02609ecdd6c51779a6e974c4d5ef1d56fb..4f53814319491bf52fa5dc629a9c8b7fa34159c1 100644 (file)
@@ -60,4 +60,9 @@ public class CommandsForKey implements Listener, Iterable<Command>
     {
         return Iterators.concat(uncommitted.values().iterator(), committedByExecuteAt.values().iterator());
     }
+
+    public boolean isEmpty()
+    {
+        return uncommitted.isEmpty() && committedById.isEmpty();
+    }
 }
index 9a37a030de813bcc1fbe4fdd3e718cbac2376b60..e70e9f143f4bf7c2267ae506301a0b03f490f295 100644 (file)
@@ -4,10 +4,14 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.IntFunction;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
-import java.util.stream.Stream;
+
+import com.google.common.annotations.VisibleForTesting;
 
 import accord.api.*;
 import accord.coordinate.Coordinate;
@@ -90,7 +94,7 @@ public class Node implements ConfigurationService.Listener
     private final Set<TxnId> pendingRecovery = Collections.newSetFromMap(new ConcurrentHashMap<>());
 
     public Node(Id id, MessageSink messageSink, ConfigurationService configService, LongSupplier nowSupplier,
-                Supplier<Store> dataSupplier, Agent agent, Scheduler scheduler, CommandStore.Factory commandStoreFactory)
+                Supplier<Store> dataSupplier, Agent agent, Scheduler scheduler, CommandStores.Factory factory)
     {
         this.id = id;
         this.agent = agent;
@@ -101,12 +105,7 @@ public class Node implements ConfigurationService.Listener
         this.now = new AtomicReference<>(new Timestamp(topology.epoch(), nowSupplier.getAsLong(), 0, id));
         this.nowSupplier = nowSupplier;
         this.scheduler = scheduler;
-        this.commandStores = new CommandStores(numCommandShards(),
-                                               id,
-                                               this::uniqueNow,
-                                               agent,
-                                               dataSupplier.get(),
-                                               commandStoreFactory);
+        this.commandStores = factory.create(numCommandShards(), id, this::uniqueNow, agent, dataSupplier.get());
 
         configService.registerListener(this);
         onTopologyUpdate(topology, false);
@@ -188,31 +187,39 @@ public class Node implements ConfigurationService.Listener
         return nowSupplier.getAsLong();
     }
 
-    public Stream<CommandStore> local(Keys keys)
+    public void forEachLocal(Consumer<CommandStore> forEach)
     {
-        return commandStores.forKeys(keys);
+        commandStores.forEach(forEach);
     }
 
-    public Stream<CommandStore> local(Txn txn)
+    public void forEachLocal(Keys keys, Consumer<CommandStore> forEach)
     {
-        return commandStores.forKeys(txn.keys());
+        commandStores.forEach(keys, forEach);
     }
 
-    public Stream<CommandStore> local(TxnRequest.Scope scope)
+    public void forEachLocal(Txn txn, Consumer<CommandStore> forEach)
     {
-        return commandStores.forScope(scope);
+        forEachLocal(txn.keys, forEach);
     }
 
-    public Stream<CommandStore> local()
+    public void forEachLocal(TxnRequest.Scope scope, Consumer<CommandStore> forEach)
     {
-        return commandStores.stream();
+        commandStores.forEach(scope, forEach);
     }
 
-    public Optional<CommandStore> local(Key key)
+    public <T> T mapReduceLocal(TxnRequest.Scope scope, Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
     {
-        return local(Keys.of(key)).reduce((i1, i2) -> {
-            throw new IllegalStateException("more than one instance encountered for key");
-        });
+        return commandStores.mapReduce(scope, map, reduce);
+    }
+
+    public <T extends Collection<CommandStore>> T collectLocal(Keys keys, IntFunction<T> factory)
+    {
+        return commandStores.collect(keys, factory);
+    }
+
+    public <T extends Collection<CommandStore>> T collectLocal(TxnRequest.Scope scope, IntFunction<T> factory)
+    {
+        return commandStores.collect(scope, factory);
     }
 
     // send to every node besides ourselves
@@ -284,8 +291,11 @@ public class Node implements ConfigurationService.Listener
         // TODO: The combination of updating the epoch of the next timestamp with epochs we don’t have topologies for,
         //  and requiring preaccept to talk to its topology epoch means that learning of a new epoch via timestamp
         //  (ie not via config service) will halt any new txns from a node until it receives this topology
-        if (txnId.epoch > configService.currentEpoch())
-            return configService.fetchTopologyForEpoch(txnId.epoch).flatMap(v -> coordinate(txnId, txn));
+        if (txnId.epoch > topology().epoch())
+        {
+            configService.fetchTopologyForEpoch(txnId.epoch);
+            return topology().awaitEpoch(txnId.epoch).flatMap(v -> coordinate(txnId, txn));
+        }
 
         Future<Result> result = Coordinate.execute(this, txnId, txn);
         coordinating.put(txnId, result);
@@ -309,9 +319,10 @@ public class Node implements ConfigurationService.Listener
     // TODO: encapsulate in Coordinate, so we can request that e.g. commits be re-sent?
     public void recover(TxnId txnId, Txn txn)
     {
-        if (txnId.epoch > configService.currentEpoch())
+        if (txnId.epoch > topology.epoch())
         {
-            configService.fetchTopologyForEpoch(txnId.epoch).addListener(() -> recover(txnId, txn));
+            configService.fetchTopologyForEpoch(txnId.epoch);
+            topology().awaitEpoch(txnId.epoch).addListener(() -> recover(txnId, txn));
             return;
         }
 
@@ -319,12 +330,6 @@ public class Node implements ConfigurationService.Listener
         if (result != null)
             return;
 
-        if (txnId.epoch > topology().epoch())
-        {
-            configService().fetchTopologyForEpoch(txnId.epoch).addListener(() -> recover(txnId, txn));
-            return;
-        }
-
         result = Coordinate.recover(this, txnId, txn);
         coordinating.putIfAbsent(txnId, result);
         // TODO (now): error handling
@@ -343,7 +348,8 @@ public class Node implements ConfigurationService.Listener
         long unknownEpoch = topology().maxUnknownEpoch(request);
         if (unknownEpoch > 0)
         {
-            configService.fetchTopologyForEpoch(unknownEpoch).addListener(() -> receive(request, from, replyContext));
+            configService.fetchTopologyForEpoch(unknownEpoch);
+            topology().awaitEpoch(unknownEpoch).addListener(() -> receive(request, from, replyContext));
             return;
         }
         scheduler.now(() -> request.process(this, from, replyContext));
@@ -369,4 +375,11 @@ public class Node implements ConfigurationService.Listener
     {
         return "Node{" + id + '}';
     }
+
+    @VisibleForTesting
+    public CommandStore unsafeForKey(Key key)
+    {
+        return commandStores.unsafeForKey(key);
+    }
+
 }
index a9c581a032e7e69e0f7a83db8d85e8799f00d03a..b9dc1c24f0bba595b3197fe3c7cb0c54999bf5e1 100644 (file)
@@ -36,12 +36,12 @@ public class Accept extends TxnRequest
 
     public void process(Node on, Node.Id replyToNode, ReplyContext replyContext)
     {
-        on.reply(replyToNode, replyContext, on.local(scope()).map(instance -> {
+        on.reply(replyToNode, replyContext, on.mapReduceLocal(scope(), instance -> {
             Command command = instance.command(txnId);
             if (!command.accept(ballot, txn, executeAt, deps))
                 return new AcceptNack(txnId, command.promised());
             return new AcceptOk(txnId, calculateDeps(instance, txnId, txn, executeAt));
-        }).reduce((r1, r2) -> {
+        }(r1, r2) -> {
             if (!r1.isOK()) return r1;
             if (!r2.isOK()) return r2;
             AcceptOk ok1 = (AcceptOk) r1;
@@ -50,7 +50,7 @@ public class Accept extends TxnRequest
             if (ok2.deps.isEmpty()) return ok1;
             ok1.deps.addAll(ok2.deps);
             return ok1;
-        }).orElseThrow());
+        }));
     }
 
     @Override
index 7f296e64f99918a3c9ea64df63b959a7091b9f1e..a9173591e483179356f61323f761231c05428456 100644 (file)
@@ -38,7 +38,7 @@ public class Apply extends TxnRequest
 
     public void process(Node node, Id replyToNode, ReplyContext replyContext)
     {
-        node.local(scope()).forEach(instance -> instance.command(txnId).apply(txn, deps, executeAt, writes, result));
+        node.forEachLocal(scope(), instance -> instance.command(txnId).apply(txn, deps, executeAt, writes, result));
     }
 
     @Override
index 4d6e2e50cb8076f736f684051b369ccecd3dd92b..bf2d6b897634dde7d87c9dae9f3c2c8c449021b3 100644 (file)
@@ -43,7 +43,7 @@ public class BeginRecovery extends TxnRequest
 
     public void process(Node node, Id replyToNode, ReplyContext replyContext)
     {
-        RecoverReply reply = node.local(scope()).map(instance -> {
+        RecoverReply reply = node.mapReduceLocal(scope(), instance -> {
             Command command = instance.command(txnId);
 
             if (!command.recover(txn, ballot))
@@ -84,7 +84,7 @@ public class BeginRecovery extends TxnRequest
                                               .collect(Dependencies::new, Dependencies::add, Dependencies::addAll);
             }
             return new RecoverOk(txnId, command.status(), command.accepted(), command.executeAt(), deps, earlierCommittedWitness, earlierAcceptedNoWitness, rejectsFastPath, command.writes(), command.result());
-        }).reduce((r1, r2) -> {
+        }(r1, r2) -> {
             if (!r1.isOK()) return r1;
             if (!r2.isOK()) return r2;
             RecoverOk ok1 = (RecoverOk) r1;
@@ -140,7 +140,7 @@ public class BeginRecovery extends TxnRequest
                     ok1.earlierAcceptedNoWitness,
                     ok1.rejectsFastPath | ok2.rejectsFastPath,
                     ok1.writes, ok1.result);
-        }).orElseThrow();
+        });
 
         node.reply(replyToNode, replyContext, reply);
         if (reply instanceof RecoverOk && ((RecoverOk) reply).status == Applied)
@@ -148,9 +148,10 @@ public class BeginRecovery extends TxnRequest
             // disseminate directly
             RecoverOk ok = (RecoverOk) reply;
             ConfigurationService configService = node.configService();
-            if (ok.executeAt.epoch > configService.currentEpoch())
+            if (ok.executeAt.epoch > node.topology().epoch())
             {
-                configService.fetchTopologyForEpoch(ok.executeAt.epoch).addListener(() -> disseminateApply(node, ok));
+                configService.fetchTopologyForEpoch(ok.executeAt.epoch);
+                node.topology().awaitEpoch(ok.executeAt.epoch).addListener(() -> disseminateApply(node, ok));
                 return;
             }
             disseminateApply(node, ok);
@@ -160,6 +161,12 @@ public class BeginRecovery extends TxnRequest
     private void disseminateApply(Node node, RecoverOk ok)
     {
         Preconditions.checkArgument(ok.status == Applied);
+        if (ok.executeAt.epoch > node.epoch())
+        {
+            node.configService().fetchTopologyForEpoch(ok.executeAt.epoch);
+            node.topology().awaitEpoch(ok.executeAt.epoch).addListener(() -> disseminateApply(node, ok));
+            return;
+        }
         Topologies topologies = node.topology().forKeys(txn.keys, ok.executeAt.epoch);
         node.send(topologies.nodes(), to -> new Apply(to, topologies, txnId, txn, ok.executeAt, ok.deps, ok.writes, ok.result));
     }
index 57d13e2198442ee124fba0b00547ef7e38bf5ef8..8bf512d671f52edc089f441bfd1aecb9ff49e337 100644 (file)
@@ -28,7 +28,7 @@ public class Commit extends ReadData
 
     public void process(Node node, Id from, ReplyContext replyContext)
     {
-        node.local(scope()).forEach(instance -> instance.command(txnId).commit(txn, deps, executeAt));
+        node.forEachLocal(scope(), instance -> instance.command(txnId).commit(txn, deps, executeAt));
         if (read) super.process(node, from, replyContext);
     }
 
index 0dcfe4b153c36c89803a179f0a7cb578394c3011..cee9cffefa6889abeaa0149e8759d68700995a88 100644 (file)
@@ -33,7 +33,7 @@ public class PreAccept extends TxnRequest
 
     public void process(Node node, Id from, ReplyContext replyContext)
     {
-        node.reply(from, replyContext, node.local(scope()).map(instance -> {
+        node.reply(from, replyContext, node.mapReduceLocal(scope(), instance -> {
             // note: this diverges from the paper, in that instead of waiting for JoinShard,
             //       we PreAccept to both old and new topologies and require quorums in both.
             //       This necessitates sending to ALL replicas of old topology, not only electorate (as fast path may be unreachable).
@@ -41,7 +41,7 @@ public class PreAccept extends TxnRequest
             if (!command.witness(txn))
                 return PreAcceptNack.INSTANCE;
             return new PreAcceptOk(txnId, command.executeAt(), calculateDeps(instance, txnId, txn, txnId));
-        }).reduce((r1, r2) -> {
+        }(r1, r2) -> {
             if (!r1.isOK()) return r1;
             if (!r2.isOK()) return r2;
             PreAcceptOk ok1 = (PreAcceptOk) r1;
@@ -50,7 +50,7 @@ public class PreAccept extends TxnRequest
             if (ok1 != okMax && !ok1.deps.isEmpty()) okMax.deps.addAll(ok1.deps);
             if (ok2 != okMax && !ok2.deps.isEmpty()) okMax.deps.addAll(ok2.deps);
             return okMax;
-        }).orElseThrow());
+        }));
     }
 
     @Override
index e35d4bc16e5e323de9137720c9b69f1aa444ee88..6c08b970c9933abfc6af9f12068cf9f8014bb7ed 100644 (file)
@@ -124,7 +124,7 @@ public class ReadData extends TxnRequest
         synchronized void setup(TxnId txnId, Txn txn, Scope scope)
         {
             // TODO: simple hash set supporting concurrent modification, or else avoid concurrent modification
-            waitingOn = node.local(scope).collect(Collectors.toCollection(() -> new DeterministicIdentitySet<>()));
+            waitingOn = node.collectLocal(scope, DeterministicIdentitySet::new);
             // FIXME: fix/check thread safety
             CommandStore.onEach(waitingOn, instance -> {
                 Command command = instance.command(txnId);
index 606825b06fb87deb4a13ee4553cc77a297eb111b..a41b56e044b0dcabeb70da9c739c08dfd05c01dc 100644 (file)
@@ -7,9 +7,6 @@ import accord.topology.Topology;
 import accord.txn.Keys;
 import accord.txn.Txn;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
 import java.util.Objects;
 
 public abstract class TxnRequest implements Request
@@ -32,85 +29,37 @@ public abstract class TxnRequest implements Request
      */
     public static class Scope
     {
-        public static class KeysForEpoch
-        {
-            public final long epoch;
-            public final Keys keys;
-
-            public KeysForEpoch(long epoch, Keys keys)
-            {
-                this.epoch = epoch;
-                this.keys = keys;
-            }
-
-            static KeysForEpoch forTopology(Topology topology, Node.Id node, Keys keys)
-            {
-                KeyRanges topologyRanges = topology.rangesForNode(node);
-                if (topologyRanges == null)
-                    return null;
-                topologyRanges = topologyRanges.intersection(keys);
-                Keys scopeKeys = keys.intersection(topologyRanges);
-                return !topologyRanges.isEmpty() ? new KeysForEpoch(topology.epoch(), scopeKeys) : null;
-            }
-
-            @Override
-            public boolean equals(Object o)
-            {
-                if (this == o) return true;
-                if (o == null || getClass() != o.getClass()) return false;
-                KeysForEpoch that = (KeysForEpoch) o;
-                return epoch == that.epoch && keys.equals(that.keys);
-            }
-
-            @Override
-            public int hashCode()
-            {
-                return Objects.hash(epoch, keys);
-            }
-
-            @Override
-            public String toString()
-            {
-                return "EpochRanges{" +
-                        "epoch=" + epoch +
-                        ", keys=" + keys +
-                        '}';
-            }
-        }
-
-        private final long maxEpoch;
-        private final KeysForEpoch[] epochs;
-
-        public Scope(long maxEpoch, KeysForEpoch... epochKeys)
-        {
-            this.maxEpoch = maxEpoch;
-            this.epochs = epochKeys;
-        }
-
-        public int size()
-        {
-            return epochs.length;
-        }
+        private final long minRequiredEpoch;
+        private final Keys keys;
 
-        public KeysForEpoch get(int i)
+        public Scope(long minRequiredEpoch, Keys keys)
         {
-            return epochs[i];
+            this.minRequiredEpoch = minRequiredEpoch;
+            this.keys = keys;
         }
 
-        public static Scope forTopologies(Node.Id node, Topologies topologies, Keys keys)
+        public static Scope forTopologies(Node.Id node, Topologies topologies, Keys txnKeys)
         {
-            List<KeysForEpoch> ranges = new ArrayList<>(topologies.size());
+            long minEpoch = 0;
+            Keys scopeKeys = Keys.EMPTY;
+            Keys lastKeys = null;
             for (int i=topologies.size() - 1; i>=0; i--)
             {
                 Topology topology = topologies.get(i);
-                KeysForEpoch keysForEpoch = KeysForEpoch.forTopology(topology, node, keys);
-                if (keysForEpoch != null)
+                KeyRanges topologyRanges = topology.rangesForNode(node);
+                if (topologyRanges == null)
+                    continue;
+                topologyRanges = topologyRanges.intersection(txnKeys);
+                Keys epochKeys = txnKeys.intersection(topologyRanges);
+                if (lastKeys == null || !lastKeys.containsAll(epochKeys))
                 {
-                    ranges.add(keysForEpoch);
+                    minEpoch = topology.epoch();
+                    scopeKeys = scopeKeys.merge(epochKeys);
                 }
+                lastKeys = epochKeys;
             }
 
-            return new Scope(topologies.currentEpoch(), ranges.toArray(KeysForEpoch[]::new));
+            return new Scope(minEpoch, scopeKeys);
         }
 
         public static Scope forTopologies(Node.Id node, Topologies topologies, Txn txn)
@@ -118,27 +67,13 @@ public abstract class TxnRequest implements Request
             return forTopologies(node, topologies, txn.keys());
         }
 
-        public long maxEpoch()
-        {
-            return maxEpoch;
-        }
-
-        public boolean intersects(KeyRanges ranges)
+        public long minRequiredEpoch()
         {
-            for (KeysForEpoch keysForEpoch : this.epochs)
-            {
-                if (ranges.intersects(keysForEpoch.keys))
-                    return true;
-            }
-
-            return false;
+            return minRequiredEpoch;
         }
 
         public Keys keys()
         {
-            Keys keys = epochs[0].keys;
-            for (int i = 1; i< epochs.length; i++)
-                keys = keys.merge(epochs[i].keys);
             return keys;
         }
 
@@ -147,23 +82,22 @@ public abstract class TxnRequest implements Request
         {
             if (this == o) return true;
             if (o == null || getClass() != o.getClass()) return false;
-            Scope that = (Scope) o;
-            return maxEpoch == that.maxEpoch && Arrays.equals(epochs, that.epochs);
+            Scope scope = (Scope) o;
+            return minRequiredEpoch == scope.minRequiredEpoch && keys.equals(scope.keys);
         }
 
         @Override
         public int hashCode()
         {
-            int result = Objects.hash(maxEpoch);
-            result = 31 * result + Arrays.hashCode(epochs);
-            return result;
+            return Objects.hash(minRequiredEpoch, keys);
         }
 
         @Override
         public String toString()
         {
-            return "TxnRequestScope{" +
-                    "epochs=" + Arrays.toString(epochs) +
+            return "Scope{" +
+                    "maxEpoch=" + minRequiredEpoch +
+                    ", keys=" + keys +
                     '}';
         }
     }
index a09324f2badb40372987e8d504df0d0c67aaabca..63aafb904a6142c4db612cfb8bbc53ab2c806ffd 100644 (file)
@@ -1,8 +1,8 @@
 package accord.messages;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 
 import accord.local.*;
 import accord.local.Node.Id;
@@ -57,7 +57,7 @@ public class WaitOnCommit extends TxnRequest
                 node.reply(replyToNode, replyContext, WaitOnCommitOk.INSTANCE);
         }
 
-        void process(CommandStore instance)
+        void setup(CommandStore instance)
         {
             Command command = instance.command(txnId);
             switch (command.status())
@@ -78,9 +78,9 @@ public class WaitOnCommit extends TxnRequest
 
         synchronized void setup(Keys keys)
         {
-            List<CommandStore> instances = node.local(keys).collect(Collectors.toList());
+            List<CommandStore> instances = node.collectLocal(keys, ArrayList::new);
             waitingOn.set(instances.size());
-            instances.forEach(instance -> instance.processBlocking(this::process));
+            instances.forEach(instance -> instance.processBlocking(this::setup));
         }
     }
 
index 427a5e1d247831f97aea64401b8bef891bb0c5c0..de2ce604992ff7afce990c76987e856d19af2065 100644 (file)
@@ -64,6 +64,11 @@ public class KeyRanges implements Iterable<KeyRange>
         return rangeIndexForKey(0, ranges.length, key);
     }
 
+    public boolean contains(Key key)
+    {
+        return rangeIndexForKey(key) >= 0;
+    }
+
     public int size()
     {
         return ranges.length;
index 8c88d320867b51e32e686f1d9c5ec1328c628cb6..dd250716fe5144592e4699d6e491375a5a928509 100644 (file)
@@ -118,11 +118,6 @@ public class Topology extends AbstractCollection<Shard>
         return supersetRangeIndexes.length < shards.length;
     }
 
-    public boolean isSubsetOf(Topology topology)
-    {
-        return epoch() == topology.epoch() && Arrays.equals(this.shards, topology.shards);
-    }
-
     public Topology withEpoch(long epoch)
     {
         return new Topology(epoch, shards, ranges, nodeLookup, subsetOfRanges, supersetRangeIndexes);
@@ -192,7 +187,7 @@ public class Topology extends AbstractCollection<Shard>
         return forKeys(select, (i, shard) -> true);
     }
 
-    public <T> T accumulateForKeys(Keys select, IndexedBiFunction<Shard, T, T> function, T start)
+    public <T> T foldl(Keys select, IndexedBiFunction<Shard, T, T> function, T accumulator)
     {
         int subsetIndex = 0;
         for (int i = 0 ; i < select.size() ; )
@@ -203,11 +198,11 @@ public class Topology extends AbstractCollection<Shard>
                 throw new IllegalArgumentException("Range not found for " + select.get(i));
             int supersetIndex = supersetRangeIndexes[subsetIndex];
             Shard shard = shards[supersetIndex];
-            start = function.apply(subsetIndex, shard, start);
+            accumulator = function.apply(subsetIndex, shard, accumulator);
             // find the first key outside this range
             i = shard.range.higherKeyIndex(select, i, select.size());
         }
-        return start;
+        return accumulator;
     }
 
     /**
index cd3ed84ebea682a3c6a29f342d94852e9457e35d..d3e9a92b00273fecaae7a9645e8ba66d3b35cbed 100644 (file)
@@ -9,6 +9,9 @@ import accord.txn.Keys;
 import accord.txn.Txn;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.ImmediateFuture;
 
 import java.util.*;
 import java.util.function.LongConsumer;
@@ -25,13 +28,14 @@ import java.util.function.LongConsumer;
  */
 public class TopologyManager implements ConfigurationService.Listener
 {
+    private static final Future<Void> SUCCESS = ImmediateFuture.success(null);
     class EpochState
     {
         private final Topology global;
         private final Topology local;
         private final QuorumTracker syncTracker;
         private boolean syncComplete = false;
-        private boolean prevSynced = false;
+        private boolean prevSynced;
 
         EpochState(Topology global, boolean prevSynced)
         {
@@ -39,8 +43,7 @@ public class TopologyManager implements ConfigurationService.Listener
             this.global = global;
             this.local = global.forNode(node);
             this.syncTracker = new QuorumTracker(new Topologies.Singleton(global, false));
-            if (prevSynced)
-                markPrevSynced();
+            this.prevSynced = prevSynced;
         }
 
         void markPrevSynced()
@@ -73,7 +76,7 @@ public class TopologyManager implements ConfigurationService.Listener
                 return false;
             if (syncComplete)
                 return true;
-            Boolean result = global.accumulateForKeys(keys, (i, shard, acc) -> {
+            Boolean result = global.foldl(keys, (i, shard, acc) -> {
                 if (acc == Boolean.FALSE)
                     return acc;
                 return Boolean.valueOf(syncTracker.unsafeGet(i).hasReachedQuorum());
@@ -97,10 +100,16 @@ public class TopologyManager implements ConfigurationService.Listener
         // until the superseding epoch has been applied
         private final List<Set<Node.Id>> pendingSyncComplete;
 
-        private Epochs(EpochState[] epochs, List<Set<Node.Id>> pendingSyncComplete)
+        // list of promises to be completed as newer epochs become active. This is to support processes that
+        // are waiting on future epochs to begin (ie: txn requests from futures epochs). Index 0 is for
+        // currentEpoch + 1
+        private final List<AsyncPromise<Void>> futureEpochFutures;
+
+        private Epochs(EpochState[] epochs, List<Set<Node.Id>> pendingSyncComplete, List<AsyncPromise<Void>> futureEpochFutures)
         {
             this.currentEpoch = epochs.length > 0 ? epochs[0].epoch() : 0;
             this.pendingSyncComplete = pendingSyncComplete;
+            this.futureEpochFutures = futureEpochFutures;
             for (int i=1; i<epochs.length; i++)
                 Preconditions.checkArgument(epochs[i].epoch() == epochs[i-1].epoch() - 1);
             this.epochs = epochs;
@@ -108,7 +117,19 @@ public class TopologyManager implements ConfigurationService.Listener
 
         private Epochs(EpochState[] epochs)
         {
-            this(epochs, new ArrayList<>());
+            this(epochs, new ArrayList<>(), new ArrayList<>());
+        }
+
+        public Future<Void> awaitEpoch(long epoch)
+        {
+            if (epoch <= currentEpoch)
+                return SUCCESS;
+
+            int diff = (int) (epoch - currentEpoch);
+            while (futureEpochFutures.size() < diff)
+                futureEpochFutures.add(new AsyncPromise<>());
+
+            return futureEpochFutures.get(diff - 1);
         }
 
         public long nextEpoch()
@@ -121,25 +142,6 @@ public class TopologyManager implements ConfigurationService.Listener
             return epochs.length > 0 ? epochs[0].global : Topology.EMPTY;
         }
 
-        public Epochs add(Topology topology)
-        {
-            Preconditions.checkArgument(topology.epoch == nextEpoch());
-            EpochState[] nextEpochs = new EpochState[epochs.length + 1];
-            List<Set<Node.Id>> pendingSync = new ArrayList<>(pendingSyncComplete);
-            if (!pendingSync.isEmpty())
-            {
-                EpochState current = epochs[0];
-                if (epochs.length <= 1 || epochs[1].syncComplete())
-                    current.markPrevSynced();
-                pendingSync.remove(0).forEach(current::recordSyncComplete);
-            }
-            System.arraycopy(epochs, 0, nextEpochs, 1, epochs.length);
-
-            boolean prevSynced = epochs.length == 0 || epochs[0].syncComplete();
-            nextEpochs[0] = new EpochState(topology, prevSynced);
-            return new Epochs(nextEpochs, pendingSync);
-        }
-
         /**
          * Mark sync complete for the given node/epoch, and if this epoch
          * is now synced, update the prevSynced flag on superseding epochs
@@ -177,35 +179,8 @@ public class TopologyManager implements ConfigurationService.Listener
 
         long maxUnknownEpoch(TxnRequest.Scope scope)
         {
-            EpochState lastState = null;
-            for (int i=0, mi=scope.size(); i<mi; i++)
-            {
-                TxnRequest.Scope.KeysForEpoch requestRanges = scope.get(i);
-                EpochState epochState = get(requestRanges.epoch);
-
-                if (epochState != null)
-                {
-                    lastState = epochState;
-                }
-                else if (lastState != null && lastState.local.ranges().intersects(requestRanges.keys))
-                {
-                    // we don't have the most recent epoch, but still replicate the requested ranges
-                    continue;
-                }
-                else
-                {
-                    // we don't have the most recent epoch, and we don't replicate the requested ranges
-                    return scope.maxEpoch();
-                }
-
-                // validate requested ranges
-                KeyRanges localRanges = epochState.local.ranges();
-                if (!localRanges.intersects(requestRanges.keys))
-                    throw new RuntimeException("Received request for ranges not replicated by this node");
-            }
-            if (scope.maxEpoch() > 0)
-                epochReporter.accept(scope.maxEpoch());
-
+            if (currentEpoch < scope.minRequiredEpoch())
+                return scope.minRequiredEpoch();
             return 0;
         }
 
@@ -229,7 +204,33 @@ public class TopologyManager implements ConfigurationService.Listener
     @Override
     public synchronized void onTopologyUpdate(Topology topology)
     {
-        epochs = epochs.add(topology);
+        Epochs current = epochs;
+
+        Preconditions.checkArgument(topology.epoch == current.nextEpoch());
+        EpochState[] nextEpochs = new EpochState[current.epochs.length + 1];
+        List<Set<Node.Id>> pendingSync = new ArrayList<>(current.pendingSyncComplete);
+        if (!pendingSync.isEmpty())
+        {
+            EpochState currentEpoch = current.epochs[0];
+            if (current.epochs.length <= 1 || current.epochs[1].syncComplete())
+                currentEpoch.markPrevSynced();
+            pendingSync.remove(0).forEach(currentEpoch::recordSyncComplete);
+        }
+        System.arraycopy(current.epochs, 0, nextEpochs, 1, current.epochs.length);
+
+        boolean prevSynced = current.epochs.length == 0 || current.epochs[0].syncComplete();
+        nextEpochs[0] = new EpochState(topology, prevSynced);
+
+        List<AsyncPromise<Void>> futureEpochFutures = new ArrayList<>(current.futureEpochFutures);
+        AsyncPromise<Void> toComplete = !futureEpochFutures.isEmpty() ? futureEpochFutures.remove(0) : null;
+        epochs = new Epochs(nextEpochs, pendingSync, futureEpochFutures);
+        if (toComplete != null)
+            toComplete.trySuccess(null);
+    }
+
+    public synchronized Future<Void> awaitEpoch(long epoch)
+    {
+        return epochs.awaitEpoch(epoch);
     }
 
     @Override
index cf6388b340d68d30982a7533555a24a35c502653..f5bfc97a34e64687d1fc082ab3aa7f4ca39c215d 100644 (file)
@@ -8,7 +8,6 @@ import java.util.stream.Stream;
 import accord.api.Key;
 import accord.api.KeyRange;
 import accord.topology.KeyRanges;
-import com.google.common.base.Preconditions;
 
 @SuppressWarnings("rawtypes")
 public class Keys implements Iterable<Key>
@@ -67,6 +66,42 @@ public class Keys implements Iterable<Key>
         return new Keys(selection);
     }
 
+    /**
+     * return true if this keys collection contains all keys found in the given keys
+     */
+    public boolean containsAll(Keys that)
+    {
+        if (isEmpty())
+            return that.isEmpty();
+
+        for (int thisIdx=0, thatIdx=0, thisSize=size(), thatSize=that.size();
+             thatIdx<thatSize;
+             thisIdx++, thatIdx++)
+        {
+            if (thisIdx >= thisSize)
+                return false;
+
+            Key thatKey = that.keys[thatIdx];
+            Key thisKey = this.keys[thisIdx];
+            int cmp = thisKey.compareTo(thatKey);
+
+            if (cmp == 0)
+                continue;
+
+            // if this key is greater that that key, we can't contain that key
+            if (cmp > 0)
+                return false;
+
+            // if search returns a positive index, a match was found and
+            // no further comparison is needed
+            thisIdx = Arrays.binarySearch(keys, thisIdx, thisSize, thatKey);
+            if (thisIdx < 0)
+                return false;
+        }
+
+        return true;
+    }
+
     public Keys merge(Keys that)
     {
         int thisIdx = 0;
@@ -225,20 +260,16 @@ public class Keys implements Iterable<Key>
         return result != null ? new Keys(result) : EMPTY;
     }
 
-    public interface KeyAccumulator<V>
+    public interface KeyFold<V>
     {
-        V accumulate(Key key, V value);
-        default boolean isDone()
-        {
-            return false;
-        }
+        V fold(Key key, V value);
     }
 
     /**
      * Count the number of keys matching the predicate and intersecting with the given ranges.
      * If terminateAfter is greater than 0, the method will return once terminateAfter matches are encountered
      */
-    public <V> V accumulate(KeyRanges ranges, KeyAccumulator<V> accumulator, V value)
+    public <V> V foldl(KeyRanges ranges, KeyFold<V> fold, V accumulator)
     {
         int keyLB = 0;
         int keyHB = size();
@@ -249,7 +280,7 @@ public class Keys implements Iterable<Key>
         for (;rangeLB<rangeHB && keyLB<keyHB;)
         {
             Key key = keys[keyLB];
-            rangeLB = ranges.rangeIndexForKey(rangeLB, ranges.size(), key);
+            rangeLB = ranges.rangeIndexForKey(rangeLB, rangeHB, key);
 
             if (rangeLB < 0)
             {
@@ -264,11 +295,7 @@ public class Keys implements Iterable<Key>
                 int highKey = range.higherKeyIndex(this, keyLB, keyHB);
 
                 for (int i=keyLB; i<highKey; i++)
-                {
-                    value = accumulator.accumulate(keys[i], value);
-                    if (accumulator.isDone())
-                        return value;
-                }
+                    accumulator = fold.fold(keys[i], accumulator);
 
                 keyLB = highKey;
                 rangeLB++;
@@ -278,43 +305,59 @@ public class Keys implements Iterable<Key>
                 keyLB = -1 - keyLB;
         }
 
-        return value;
+        return accumulator;
     }
 
-    public <V> V accumulate(KeyRanges ranges, KeyAccumulator<V> accumulator)
+    public boolean any(KeyRanges ranges, Predicate<Key> predicate)
     {
-        return accumulate(ranges, accumulator, null);
+        return 1 == foldl(ranges, (key, i1, i2) -> predicate.test(key) ? 1 : 0, 0, 0, 1);
     }
 
-    private static class TerminatingKeyAccumulator<V> implements KeyAccumulator<V>
+    public interface FoldKeysToLong
     {
-        private boolean isDone = false;
-        private final Predicate<Key> predicate;
+        long apply(Key key, long param, long prev);
+    }
 
-        public TerminatingKeyAccumulator(Predicate<Key> predicate)
-        {
-            this.predicate = predicate;
-        }
+    public long foldl(KeyRanges ranges, FoldKeysToLong fold, long param, long initialValue, long terminalValue)
+    {
+        int keyLB = 0;
+        int keyHB = size();
+        int rangeLB = 0;
+        int rangeHB = ranges.rangeIndexForKey(keys[keyHB-1]);
+        rangeHB = rangeHB < 0 ? -1 - rangeHB : rangeHB + 1;
 
-        @Override
-        final public V accumulate(Key key, V value)
+        for (;rangeLB<rangeHB && keyLB<keyHB;)
         {
-            Preconditions.checkState(!isDone);
-            isDone = predicate.test(key);
-            return value;
-        }
+            Key key = keys[keyLB];
+            rangeLB = ranges.rangeIndexForKey(rangeLB, ranges.size(), key);
 
-        @Override
-        final public boolean isDone()
-        {
-            return isDone;
+            if (rangeLB < 0)
+            {
+                rangeLB = -1 -rangeLB;
+                if (rangeLB >= rangeHB)
+                    break;
+                keyLB = ranges.get(rangeLB).lowKeyIndex(this, keyLB, keyHB);
+            }
+            else
+            {
+                KeyRange<?> range = ranges.get(rangeLB);
+                int highKey = range.higherKeyIndex(this, keyLB, keyHB);
+
+                for (int i=keyLB; i<highKey; i++)
+                {
+                    initialValue = fold.apply(keys[i], param, initialValue);
+                    if (terminalValue == initialValue)
+                        return initialValue;
+                }
+
+                keyLB = highKey;
+                rangeLB++;
+            }
+
+            if (keyLB < 0)
+                keyLB = -1 - keyLB;
         }
-    }
 
-    public boolean any(KeyRanges ranges, Predicate<Key> predicate)
-    {
-        TerminatingKeyAccumulator<Void> accumulator = new TerminatingKeyAccumulator<>(predicate);
-        accumulate(ranges, accumulator, null);
-        return accumulator.isDone();
+        return initialValue;
     }
 }
index 9466a1c79c8c7dc5d88c227e456828003f68dff9..ae2619dc0644c576f8776c2397f6ad519dad9a88 100644 (file)
@@ -6,7 +6,6 @@ import java.util.stream.Stream;
 
 import accord.api.*;
 import accord.local.*;
-import accord.topology.KeyRanges;
 
 public class Txn
 {
@@ -90,14 +89,14 @@ public class Txn
 
     public Data read(Command command, Keys keyScope)
     {
-        return keyScope.accumulate(command.commandStore.ranges(), (key, accumulate) -> {
+        return keyScope.foldl(command.commandStore.ranges(), (key, accumulate) -> {
             CommandStore commandStore = command.commandStore;
             if (!commandStore.hashIntersects(key))
                 return accumulate;
 
             Data result = read.read(key, command.executeAt(), commandStore.store());
             return accumulate != null ? accumulate.merge(result) : result;
-        });
+        }, null);
     }
 
     public Timestamp maxConflict(CommandStore commandStore)
index 019ed8bf11a54bd99c670d944d2530ec91d5131d..67bc51c711a88625337891fc3206809a5c84c89e 100644 (file)
@@ -21,11 +21,11 @@ public class Writes
         if (write == null)
             return;
 
-        keys.accumulate(commandStore.ranges(), (key, accumulate) -> {
+        keys.foldl(commandStore.ranges(), (key, accumulate) -> {
             if (commandStore.hashIntersects(key))
                 write.apply(key, executeAt, commandStore.store());
             return accumulate;
-        });
+        }, null);
     }
 
     @Override
index b05b62a197ee1bbeee417881c87ada515c791d9f..90ba39d27b8136bf396ab75f5ade634ccb01c179 100644 (file)
@@ -21,12 +21,18 @@ public class DeterministicIdentitySet<T> extends AbstractSet<T>
     }
 
     // TODO: an identity hash map that doesn't mind concurrent modification / iteration
-    final IdentityHashMap<T, Entry<T>> lookup = new IdentityHashMap<>();
+    final IdentityHashMap<T, Entry<T>> lookup;
     final Entry<T> head = new Entry<T>(null);
 
     public DeterministicIdentitySet()
+    {
+        this(0);
+    }
+
+    public DeterministicIdentitySet(int size)
     {
         head.prev = head.next = head;
+        lookup = new IdentityHashMap<>(size);
     }
 
     @Override
index 1ba217fc786d1e14edf16cbce52ff7178f6e05af..30b5cbb537a453d68bc7a2317e7e87edfc46476b 100644 (file)
@@ -5,6 +5,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import accord.api.Scheduler;
+import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,7 +78,7 @@ public class ThreadPoolScheduler implements Scheduler
         }
         catch (InterruptedException e)
         {
-            throw new IllegalStateException(e);
+            throw new UncheckedInterruptedException(e);
         }
     }
 }
index 03f8bfdf98bc22cfeee7b5de2d8a29902a661e32..1f06656df654f2ab1f2f659ff2ae13c85765b93a 100644 (file)
@@ -1,8 +1,15 @@
 package accord;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import accord.api.Key;
 import accord.api.KeyRange;
 import accord.impl.IntKey;
 import accord.topology.KeyRanges;
+import accord.txn.Keys;
+
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import static accord.impl.IntKey.keys;
@@ -19,6 +26,7 @@ public class KeysTest
     {
         return new KeyRanges(ranges);
     }
+
     @Test
     void intersectionTest()
     {
@@ -43,4 +51,48 @@ public class KeysTest
         assertEquals(keys(0, 1, 2, 3, 4),
                      keys(0, 2, 4).merge(keys(1, 3)));
     }
+
+    @Test
+    void foldlTest()
+    {
+        List<Key> keys = new ArrayList<>();
+        long result = keys(150, 250, 350, 450, 550).foldl(ranges(r(200, 400)), (key, p, v) -> { keys.add(key); return v * p + 1; }, 15, 0, -1);
+        assertEquals(16, result);
+        assertEquals(keys(250, 350), new Keys(keys));
+
+        keys.clear();
+        result = keys(150, 250, 350, 450, 550).foldl(ranges(r(0, 500)), (key, p, v) -> { keys.add(key); return v * p + 1; }, 15, 0, -1);
+        assertEquals(3616, result);
+        assertEquals(keys(150, 250, 350, 450), new Keys(keys));
+
+        keys.clear();
+        result = keys(150, 250, 350, 450, 550).foldl(ranges(r(500, 1000)), (key, p, v) -> { keys.add(key); return v * p + 1; }, 15, 0, -1);
+        assertEquals(1, result);
+        assertEquals(keys(550), new Keys(keys));
+
+        keys.clear();
+        result = keys(150, 250, 350, 450, 550).foldl(ranges(r(0, 20), r(100, 140), r(149, 151), r(560, 2000)), (key, p, v) -> { keys.add(key); return v * p + 1; }, 15, 0, -1);
+        assertEquals(1, result);
+        assertEquals(keys(150), new Keys(keys));
+    }
+
+    @Test
+    void containsAll()
+    {
+        Keys keys = keys(150, 200, 250, 300, 350);
+        Assertions.assertTrue(keys.containsAll(keys(150, 200)));
+        Assertions.assertTrue(keys.containsAll(keys(150, 250)));
+        Assertions.assertTrue(keys.containsAll(keys(200, 250)));
+        Assertions.assertTrue(keys.containsAll(keys(200, 300)));
+        Assertions.assertTrue(keys.containsAll(keys(250, 300)));
+        Assertions.assertTrue(keys.containsAll(keys(250, 350)));
+
+        Assertions.assertFalse(keys.containsAll(keys(100, 150)));
+        Assertions.assertFalse(keys.containsAll(keys(100, 250)));
+        Assertions.assertFalse(keys.containsAll(keys(200, 225)));
+        Assertions.assertFalse(keys.containsAll(keys(225, 300)));
+        Assertions.assertFalse(keys.containsAll(keys(250, 235)));
+        Assertions.assertFalse(keys.containsAll(keys(250, 400)));
+
+    }
 }
index a516bb2e38a87c1abd422fe7c18c65e54c53710a..05014170cc3c391eb10080232e58deb84d1f078c 100644 (file)
@@ -247,7 +247,7 @@ public class BurnTest
             }
             catch (Throwable t)
             {
-                logger.error("Exception running burn test:", t);
+                logger.error("Exception running burn test for seed {}:", seed, t);
                 throw t;
             }
         } while (overrideSeed == null);
index e7215f24538077987c1538177b696298503b0107..dae12e90464ec223082144d8964ebe66dc24ab9e 100644 (file)
@@ -6,9 +6,9 @@ import accord.api.TestableConfigurationService;
 import accord.local.Node;
 import accord.messages.*;
 import accord.topology.Topology;
+import com.google.common.base.Preconditions;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
 import org.apache.cassandra.utils.concurrent.Future;
-import org.apache.cassandra.utils.concurrent.ImmediateFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -16,27 +16,104 @@ import java.util.*;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
-// TODO: merge with MockConfigurationService?
 public class BurnTestConfigurationService implements TestableConfigurationService
 {
     private static final Logger logger = LoggerFactory.getLogger(BurnTestConfigurationService.class);
-    private static final Future<Void> SUCCESS = ImmediateFuture.success(null);
 
     private final Node.Id node;
     private final MessageSink messageSink;
     private final Function<Node.Id, Node> lookup;
     private final Supplier<Random> randomSupplier;
-    private final List<Topology> epochs = new ArrayList<>();
+    private final Map<Long, FetchTopology> pendingEpochs = new HashMap<>();
+
+    private final EpochHistory epochs = new EpochHistory();
     private final List<ConfigurationService.Listener> listeners = new ArrayList<>();
 
+    private static class EpochState
+    {
+        private final long epoch;
+        private final AsyncPromise<Topology> received = new AsyncPromise<>();
+        private final AsyncPromise<Void> acknowledged = new AsyncPromise<>();
+        private final AsyncPromise<Void> synced = new AsyncPromise<>();
+
+        private Topology topology = null;
+
+        public EpochState(long epoch)
+        {
+            this.epoch = epoch;
+        }
+    }
+
+    private static class EpochHistory
+    {
+        // TODO: move pendingEpochs / FetchTopology into here?
+        private final List<EpochState> epochs = new ArrayList<>();
+
+        private long lastReceived = 0;
+        private long lastAcknowledged = 0;
+        private long lastSyncd = 0;
+
+        private EpochState get(long epoch)
+        {
+            for (long addEpoch = epochs.size() - 1; addEpoch <= epoch; addEpoch++)
+                epochs.add(new EpochState(addEpoch));
+            return epochs.get((int) epoch);
+        }
+
+        EpochHistory receive(Topology topology)
+        {
+            long epoch = topology.epoch();
+            Preconditions.checkState(epoch == 0 || lastReceived == epoch - 1);
+            lastReceived = epoch;
+            EpochState state = get(epoch);
+            state.topology = topology;
+            state.received.setSuccess(topology);
+            return this;
+        }
+
+        Future<Topology> receiveFuture(long epoch)
+        {
+            return get(epoch).received;
+        }
+
+        Topology topologyFor(long epoch)
+        {
+            return get(epoch).topology;
+        }
+
+        EpochHistory acknowledge(long epoch)
+        {
+            Preconditions.checkState(epoch == 0 || lastAcknowledged == epoch - 1);
+            lastAcknowledged = epoch;
+            get(epoch).acknowledged.setSuccess(null);
+            return this;
+        }
+
+        Future<Void> acknowledgeFuture(long epoch)
+        {
+            return get(epoch).acknowledged;
+        }
+
+        EpochHistory syncComplete(long epoch)
+        {
+            Preconditions.checkState(epoch == 0 || lastSyncd == epoch - 1);
+            EpochState state = get(epoch);
+            Preconditions.checkState(state.received.isDone());
+            Preconditions.checkState(state.acknowledged.isDone());
+            lastSyncd = epoch;
+            get(epoch).synced.setSuccess(null);
+            return this;
+        }
+    }
+
     public BurnTestConfigurationService(Node.Id node, MessageSink messageSink, Supplier<Random> randomSupplier, Topology topology, Function<Node.Id, Node> lookup)
     {
         this.node = node;
         this.messageSink = messageSink;
         this.randomSupplier = randomSupplier;
         this.lookup = lookup;
-        epochs.add(Topology.EMPTY);
-        epochs.add(topology);
+        epochs.receive(Topology.EMPTY).acknowledge(0).syncComplete(0);
+        epochs.receive(topology).acknowledge(1).syncComplete(1);
     }
 
     @Override
@@ -48,13 +125,13 @@ public class BurnTestConfigurationService implements TestableConfigurationServic
     @Override
     public synchronized Topology currentTopology()
     {
-        return epochs.get(epochs.size() - 1);
+        return epochs.topologyFor(epochs.lastReceived);
     }
 
     @Override
     public synchronized Topology getTopologyForEpoch(long epoch)
     {
-        return epoch >= epochs.size() ? null : epochs.get((int) epoch);
+        return epochs.topologyFor(epoch);
     }
 
     private static class FetchTopologyRequest implements Request
@@ -114,8 +191,6 @@ public class BurnTestConfigurationService implements TestableConfigurationServic
         private final FetchTopologyRequest request;
         private final List<Node.Id> candidates;
 
-        private final Set<Runnable> onComplete = new HashSet<>();
-
         public FetchTopology(long epoch)
         {
             this.request = new FetchTopologyRequest(epoch);
@@ -151,23 +226,19 @@ public class BurnTestConfigurationService implements TestableConfigurationServic
         }
     }
 
-    private final Map<Long, FetchTopology> pendingEpochs = new HashMap<>();
-
     @Override
-    public synchronized Future<Void> fetchTopologyForEpoch(long epoch)
+    public synchronized void fetchTopologyForEpoch(long epoch)
     {
-        if (epoch < epochs.size())
-        {
-            return SUCCESS;
-        }
+        if (epoch <= epochs.lastReceived)
+            return;
 
-        FetchTopology fetch = pendingEpochs.computeIfAbsent(epoch, FetchTopology::new);
-        return fetch;
+        pendingEpochs.computeIfAbsent(epoch, FetchTopology::new);
     }
 
     @Override
-    public void acknowledgeEpoch(long epoch)
+    public synchronized void acknowledgeEpoch(long epoch)
     {
+        epochs.acknowledge(epoch);
         Topology topology = getTopologyForEpoch(epoch);
         Node originator = lookup.apply(node);
         TopologyUpdate.syncEpoch(originator, epoch - 1, topology.nodes());
@@ -176,18 +247,26 @@ public class BurnTestConfigurationService implements TestableConfigurationServic
     @Override
     public synchronized void reportTopology(Topology topology)
     {
-        if (topology.epoch() < epochs.size())
+        long lastReceived = epochs.lastReceived;
+        if (topology.epoch() <= lastReceived)
             return;
 
-        if (topology.epoch() > epochs.size())
+        if (topology.epoch() > lastReceived + 1)
         {
-            fetchTopologyForEpoch(epochs.size() + 1).addListener(() -> reportTopology(topology));
+            fetchTopologyForEpoch(lastReceived + 1);
+            epochs.receiveFuture(lastReceived + 1).addListener(() -> reportTopology(topology));
             return;
         }
-        logger.trace("Epoch {} received by {}", topology.epoch(), node);
 
-        epochs.add(topology);
+        long lastAcked = epochs.lastAcknowledged;
+        if (topology.epoch() > lastAcked + 1)
+        {
+            epochs.acknowledgeFuture(lastAcked + 1).addListener(() -> reportTopology(topology));
+            return;
+        }
+        logger.trace("Epoch {} received by {}", topology.epoch(), node);
 
+        epochs.receive(topology);
         for (Listener listener : listeners)
             listener.onTopologyUpdate(topology);
 
@@ -197,5 +276,4 @@ public class BurnTestConfigurationService implements TestableConfigurationServic
 
         fetch.setSuccess(null);
     }
-
 }
index 474c5222385125dd3e7ec1ad3a65d6d0cfc3c514..ae22d60f06ec154d664cd142b7531f7d7f855642 100644 (file)
@@ -31,28 +31,7 @@ public class TopologyUpdate
 {
     private static final Logger logger = LoggerFactory.getLogger(TopologyUpdate.class);
 
-    private static class CountDownFuture<T> extends AsyncPromise<T>
-    {
-        private final AtomicInteger remaining;
-        private final T value;
-
-        public CountDownFuture(int count, T value)
-        {
-            this.remaining = new AtomicInteger(count);
-            this.value = value;
-        }
-
-        public CountDownFuture(int count)
-        {
-            this(count, null);
-        }
-
-        public void countDown()
-        {
-            if (remaining.decrementAndGet() == 0)
-                setSuccess(value);
-        }
-    }
+    private static final Set<Long> pendingTopologies = Sets.newConcurrentHashSet();
 
     private static class CommandSync
     {
@@ -78,7 +57,7 @@ public class TopologyUpdate
         }
         public void process(Node node)
         {
-            node.local(txn.keys()).forEach(commandStore -> {
+            node.forEachLocal(txn, commandStore -> {
                 switch (status)
                 {
                     case PreAccepted:
@@ -118,18 +97,9 @@ public class TopologyUpdate
         return stage.addCallback(dieOnException());
     }
 
-    private static <T> Future<Void> map(Collection<T> items, Function<T, Future<Void>> function)
-    {
-        CountDownFuture<Void> latch = new CountDownFuture<>(items.size());
-        for (T item : items)
-        {
-            function.apply(item).addListener(latch::countDown);
-        }
-        return latch;
-    }
-
     public static MessageTask notify(Node originator, Collection<Node.Id> cluster, Topology update)
     {
+        pendingTopologies.add(update.epoch());
         return MessageTask.begin(originator, cluster, "TopologyNotify:" + update.epoch(), (node, from) -> {
             long nodeEpoch = node.topology().epoch();
             if (nodeEpoch + 1 < update.epoch())
@@ -139,11 +109,6 @@ public class TopologyUpdate
         });
     }
 
-    private static Future<Void> broadcast(List<Node.Id> cluster, Function<Node.Id, Node> lookup, String desc, BiConsumer<Node, Node.Id> process)
-    {
-        return map(cluster, node -> MessageTask.apply(lookup.apply(node), cluster, desc, process));
-    }
-
     private static Collection<Node.Id> allNodesFor(Txn txn, Topology... topologies)
     {
         Set<Node.Id> result = new HashSet<>();
@@ -157,9 +122,9 @@ public class TopologyUpdate
         Map<TxnId, CommandSync> syncMessages = new ConcurrentHashMap<>();
         Consumer<Command> commandConsumer = command -> syncMessages.put(command.txnId(), new CommandSync(command));
         if (committedOnly)
-            node.local().forEach(commandStore -> commandStore.forCommittedInEpoch(ranges, epoch, commandConsumer));
+            node.forEachLocal(commandStore -> commandStore.forCommittedInEpoch(ranges, epoch, commandConsumer));
         else
-            node.local().forEach(commandStore -> commandStore.forEpochCommands(ranges, epoch, commandConsumer));
+            node.forEachLocal(commandStore -> commandStore.forEpochCommands(ranges, epoch, commandConsumer));
         return syncMessages.values().stream().map(cmd -> MessageTask.of(node, recipients.apply(cmd), "Sync:" + cmd.txnId + ':' + epoch + ':' + forEpoch, cmd::process));
     }
 
@@ -211,6 +176,10 @@ public class TopologyUpdate
                     continue;
 
                 Set<Node.Id> newNodes = Sets.difference(nextShard.nodeSet, syncShard.nodeSet);
+
+                if (newNodes.isEmpty())
+                    continue;
+
                 KeyRanges ranges = KeyRanges.singleton(intersection);
                 for (long epoch=1; epoch<syncEpoch; epoch++)
                     messageStream = Stream.concat(messageStream, syncEpochCommands(node,
@@ -254,20 +223,16 @@ public class TopologyUpdate
         return dieExceptionally(last);
     }
 
-    public static void update(Node originator, Topology update, List<Node.Id> cluster, Function<Node.Id, Node> lookup)
+    public static Future<Void> syncEpoch(Node originator, long epoch, Collection<Node.Id> cluster)
     {
-        long epoch = update.epoch();
-        // notify
-        dieExceptionally(notify(originator, cluster, update)
-                // sync operations
-                .flatMap(v -> map(cluster, node -> sync(lookup.apply(node), epoch - 1)))
-                // inform sync complete
-                .flatMap(v -> broadcast(cluster, lookup, "SyncComplete:" + epoch, (node, from) -> node.onEpochSyncComplete(from, epoch))));
+        Future<Void> future = dieExceptionally(sync(originator, epoch)
+                .flatMap(v -> MessageTask.apply(originator, cluster, "SyncComplete:" + epoch, (node, from) -> node.onEpochSyncComplete(originator.id(), epoch))));
+        future.addCallback((unused, throwable) -> pendingTopologies.remove(epoch));
+        return future;
     }
 
-    public static Future<Void> syncEpoch(Node originator, long epoch, Collection<Node.Id> cluster)
+    public static int pendingTopologies()
     {
-        return dieExceptionally(sync(originator, epoch)
-                .flatMap(v -> MessageTask.apply(originator, cluster, "SyncComplete:" + epoch, (node, from) -> node.onEpochSyncComplete(originator.id(), epoch))));
+        return pendingTopologies.size();
     }
 }
index 68e7f02a521977508d0d53cd4026b7e5ccea0d61..67b14055f6ff47ef4f5fda412e8c5a542448e54a 100644 (file)
@@ -25,7 +25,7 @@ public class RecoverTest
 {
     private static CommandStore getCommandShard(Node node, Key key)
     {
-        return node.local(key).orElseThrow();
+        return node.unsafeForKey(key);
     }
 
     private static Command getCommand(Node node, Key key, TxnId txnId)
index 5a8e210223d99dae163ae4a991063fa992dd2d51..de053987be7ce6377274b88067c529e720d24f79 100644 (file)
@@ -55,7 +55,7 @@ public class TopologyChangeTest
             TxnId txnId1 = node1.nextTxnId();
             Txn txn1 = writeTxn(keys);
             node1.coordinate(txnId1, txn1).get();
-            node1.local(keys).forEach(commands -> {
+            node1.forEachLocal(keys, commands -> {
                 Command command = commands.command(txnId1);
                 Assertions.assertTrue(command.savedDeps().isEmpty());
             });
@@ -69,20 +69,15 @@ public class TopologyChangeTest
 
             // new nodes should have the previous epochs operation as a dependency
             cluster.nodes(4, 5, 6).forEach(node -> {
-                node.local(keys).forEach(commands -> {
+                node.forEachLocal(keys, commands -> {
                     Command command = commands.command(txnId2);
                     Assertions.assertTrue(command.savedDeps().contains(txnId1));
                 });
             });
 
-            // old nodes should be aware of the new epoch
-            cluster.configServices(1, 2, 3).forEach(config -> {
-                Assertions.assertEquals(2, config.currentEpoch());
-            });
-
             // ...and participated in consensus
             cluster.nodes(1, 2, 3).forEach(node -> {
-                node.local(keys).forEach(commands -> {
+                node.forEachLocal(keys, commands -> {
                     Command command = commands.command(txnId2);
                     Assertions.assertTrue(command.hasBeen(Status.Committed));
                 });
@@ -105,7 +100,7 @@ public class TopologyChangeTest
             RecordingMessageSink messageSink = (RecordingMessageSink) node1.messageSink();
             messageSink.clearHistory();
             TxnId txnId1 = coordinate(node1, keys);
-            node1.local(keys).forEach(commands -> {
+            node1.forEachLocal(keys, commands -> {
                 Command command = commands.command(txnId1);
                 Assertions.assertTrue(command.savedDeps().isEmpty());
             });
@@ -127,7 +122,7 @@ public class TopologyChangeTest
             }).collect(Collectors.toSet());
             Assertions.assertEquals(idSet(1, 2, 3), accepts);
 
-            node1.local(keys).forEach(commands -> {
+            node1.forEachLocal(keys, commands -> {
                 Command command = commands.command(txnId2);
                 Assertions.assertTrue(command.hasBeen(Status.Committed));
                 Assertions.assertTrue(command.savedDeps().contains(txnId1));
@@ -140,7 +135,7 @@ public class TopologyChangeTest
             messageSink.clearHistory();
             TxnId txnId3 = coordinate(node1, keys);
             Assertions.assertFalse(messageSink.requests.stream().anyMatch(env -> env.payload instanceof Accept));
-            node1.local(keys).forEach(commands -> {
+            node1.forEachLocal(keys, commands -> {
                 Command command = commands.command(txnId3);
                 Assertions.assertTrue(command.hasBeen(Status.Committed));
                 Assertions.assertTrue(command.savedDeps().contains(txnId1));
index d9f59145302a0e556486a1cd4b44a2efa8d92515..2a1429dd7be0ec7e8bd82367af1ec267eda33df7 100644 (file)
@@ -17,7 +17,7 @@ import java.util.function.Supplier;
 
 import accord.api.MessageSink;
 import accord.burn.BurnTestConfigurationService;
-import accord.local.CommandStore;
+import accord.local.CommandStores;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.api.Scheduler;
@@ -96,10 +96,10 @@ public class Cluster implements Scheduler
                             || !partitionSet.contains(deliver.src) && !partitionSet.contains(deliver.dst));
             if (drop)
             {
-                logger.trace("{} DROP[{}] {}", clock++, on.epoch(), deliver);
+                logger.debug("{} DROP[{}] {}", clock++, on.epoch(), deliver);
                 return true;
             }
-            logger.trace("{} RECV[{}] {}", clock++, on.epoch(), deliver);
+            logger.debug("{} RECV[{}] {}", clock++, on.epoch(), deliver);
             if (deliver.message instanceof Reply)
             {
                 Reply reply = (Reply) deliver.message;
@@ -153,7 +153,7 @@ public class Cluster implements Scheduler
                 MessageSink messageSink = sinks.create(node, randomSupplier.get());
                 BurnTestConfigurationService configService = new BurnTestConfigurationService(node, messageSink, randomSupplier, topology, lookup::get);
                 lookup.put(node, new Node(node, messageSink, configService,
-                                          nowSupplier.get(), () -> new ListStore(node), ListAgent.INSTANCE, sinks, CommandStore.Factory.SYNCHRONIZED));
+                                          nowSupplier.get(), () -> new ListStore(node), ListAgent.INSTANCE, sinks, CommandStores.Synchronized::new));
             }
 
             List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes));
index 35ef8d56d405adef10948573e086126d81263a8a..3008de6e26dec81cccaedba541fe887731dfb5f6 100644 (file)
@@ -1,6 +1,9 @@
 package accord.impl.list;
 
+import java.util.Arrays;
+import java.util.Map;
 import java.util.TreeMap;
+import java.util.stream.Collectors;
 
 import accord.api.Data;
 import accord.api.Key;
@@ -10,7 +13,16 @@ public class ListData extends TreeMap<Key, int[]> implements Data
     @Override
     public Data merge(Data data)
     {
-        this.putAll(((ListData)data));
+        if (data != null)
+            this.putAll(((ListData)data));
         return this;
     }
+
+    @Override
+    public String toString()
+    {
+        return entrySet().stream()
+                         .map(e -> e.getKey() + "=" + Arrays.toString(e.getValue()))
+                         .collect(Collectors.joining(", ", "{", "}"));
+    }
 }
index 29cc6abc6a0d116bc81d0b7436253309f62ea724..4be25f36059f43792b3e21da607dce999c8db718 100644 (file)
@@ -1,6 +1,8 @@
 package accord.impl.list;
 
+import java.util.Arrays;
 import java.util.TreeMap;
+import java.util.stream.Collectors;
 
 import accord.api.Key;
 import accord.api.Store;
@@ -23,4 +25,12 @@ public class ListWrite extends TreeMap<Key, int[]> implements Write
         s.data.merge(key, new Timestamped<>(executeAt, data), Timestamped::merge);
         logger.trace("WRITE on {} at {} key:{} -> {}", s.node, executeAt, key, data);
     }
+
+    @Override
+    public String toString()
+    {
+        return entrySet().stream()
+                         .map(e -> e.getKey() + "=" + Arrays.toString(e.getValue()))
+                         .collect(Collectors.joining(", ", "{", "}"));
+    }
 }
index 71914dd7da42cafcc8dc9a21a5f9d63799f0fb3a..80ad2d203dc9dc9d05868a9c852c6d332e8a64d4 100644 (file)
@@ -52,7 +52,7 @@ public class EpochSync implements Runnable
         @Override
         public void process(Node node, Node.Id from, ReplyContext replyContext)
         {
-            node.local().forEach(commandStore -> {
+            node.forEachLocal(commandStore -> {
                 Command command = commandStore.command(txnId);
                 command.commit(txn, deps, executeAt);
             });
@@ -158,7 +158,7 @@ public class EpochSync implements Runnable
         {
             Map<TxnId, SyncMessage> syncMessages = new ConcurrentHashMap<>();
             Consumer<Command> commandConsumer = command -> syncMessages.put(command.txnId(), new SyncMessage(command));
-            node.local().forEach(commandStore -> commandStore.forCommittedInEpoch(syncTopology.ranges(), syncEpoch, commandConsumer));
+            node.forEachLocal(commandStore -> commandStore.forCommittedInEpoch(syncTopology.ranges(), syncEpoch, commandConsumer));
 
             for (SyncMessage message : syncMessages.values())
                 CommandSync.sync(node, message, nextTopology);
index 8e00794cfcba99bd5cf7569d7335a04ea3faee07..0a7076009c621d6b3b80d155dc0be49a36ace42a 100644 (file)
@@ -4,7 +4,7 @@ import accord.NetworkFilter;
 import accord.api.MessageSink;
 import accord.coordinate.Timeout;
 import accord.impl.TopologyUtils;
-import accord.local.CommandStore;
+import accord.local.CommandStores;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.topology.KeyRanges;
@@ -24,7 +24,6 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiFunction;
-import java.util.function.Consumer;
 import java.util.function.LongSupplier;
 
 import static accord.Utils.*;
@@ -90,7 +89,7 @@ public class MockCluster implements Network, AutoCloseable, Iterable<Node>
                         () -> store,
                         new TestAgent(),
                         new ThreadPoolScheduler(),
-                        CommandStore.Factory.SINGLE_THREAD);
+                        CommandStores.SingleThread::new);
     }
 
     private void init(Topology topology)
index 74add97607e51f60d84274a98daf3720c184384b..d0208eecee4382229ed1544507b5fb22047eb049 100644 (file)
@@ -18,7 +18,6 @@ public class MockConfigurationService implements TestableConfigurationService
     private final MessageSink messageSink;
     private final List<Topology> epochs = new ArrayList<>();
     private final List<Listener> listeners = new ArrayList<>();
-    private final Map<Long, AsyncPromise<Void>> pending = new HashMap<>();
     private final EpochFunction<MockConfigurationService> fetchTopologyHandler;
 
     public MockConfigurationService(MessageSink messageSink, EpochFunction<MockConfigurationService> fetchTopologyHandler)
@@ -53,16 +52,13 @@ public class MockConfigurationService implements TestableConfigurationService
     }
 
     @Override
-    public synchronized Future<Void> fetchTopologyForEpoch(long epoch)
+    public synchronized void fetchTopologyForEpoch(long epoch)
     {
         if (epoch < epochs.size())
-        {
-            return SUCCESS;
-        }
+            return;
 
-        Future<Void> future = pending.computeIfAbsent(epoch, e -> new AsyncPromise<>());
         fetchTopologyHandler.apply(epoch, this);
-        return future;
+        return;
     }
 
     @Override
@@ -78,11 +74,6 @@ public class MockConfigurationService implements TestableConfigurationService
 
         for (Listener listener : listeners)
             listener.onTopologyUpdate(topology);
-
-        AsyncPromise<Void> promise = pending.remove(topology.epoch());
-        if (promise == null)
-            return;
-        promise.setSuccess(null);
     }
 
     public synchronized void reportSyncComplete(Node.Id node, long epoch)
index 87cc0c55aead9d023cd4a7730fce1d8e31701e33..e3b384f295c9a9601a8f8629f959c5f0304909f5 100644 (file)
@@ -48,12 +48,12 @@ public class PreAcceptTest
                         () -> store,
                         new TestAgent(),
                         scheduler,
-                        CommandStore.Factory.SINGLE_THREAD);
+                        CommandStores.SingleThread::new);
     }
 
     private static TxnRequest.Scope scope(TxnId txnId, Txn txn)
     {
-        return new TxnRequest.Scope(txnId.epoch, new TxnRequest.Scope.KeysForEpoch(txnId.epoch, txn.keys()));
+        return new TxnRequest.Scope(txnId.epoch, txn.keys());
     }
 
     private static PreAccept preAccept(TxnId txnId, Txn txn)
@@ -72,7 +72,7 @@ public class PreAcceptTest
         try
         {
             IntKey key = IntKey.key(10);
-            CommandStore commandStore = node.local(key).orElseThrow();
+            CommandStore commandStore = node.unsafeForKey(key);
             Assertions.assertFalse(commandStore.hasCommandsForKey(key));
 
             TxnId txnId = clock.idForNode(1, ID2);
@@ -104,7 +104,7 @@ public class PreAcceptTest
         try
         {
             IntKey key = IntKey.key(10);
-            CommandStore commandStore = node.local(key).orElseThrow();
+            CommandStore commandStore = node.unsafeForKey(key);
             Assertions.assertFalse(commandStore.hasCommandsForKey(key));
 
             TxnId txnId = clock.idForNode(1, ID2);
@@ -195,7 +195,7 @@ public class PreAcceptTest
         try
         {
             IntKey key = IntKey.key(10);
-            CommandStore commandStore = node.local(key).orElseThrow();
+            CommandStore commandStore = node.unsafeForKey(key);
 
             configService(node).reportTopology(node.topology().current().withEpoch(2));
             messageSink.clearHistory();
index a1250b8521fffb64f798ae2588f838eadfef02ef..3a0dca491674a7e2a401e5b481a163f85f2af98b 100644 (file)
@@ -1,9 +1,7 @@
 package accord.messages;
 
 import accord.api.KeyRange;
-import accord.impl.IntKey;
 import accord.messages.TxnRequest.Scope;
-import accord.messages.TxnRequest.Scope.KeysForEpoch;
 import accord.topology.Topologies;
 import accord.topology.Topology;
 import accord.txn.Keys;
@@ -12,29 +10,20 @@ import org.junit.jupiter.api.Test;
 
 import static accord.Utils.*;
 import static accord.Utils.idSet;
+import static accord.impl.IntKey.keys;
 import static accord.impl.IntKey.range;
 
 public class TxnRequestScopeTest
 {
-    private static KeysForEpoch epochRanges(long epoch, Keys keys)
+    private static Scope scope(long epoch, Keys keys)
     {
-        return new KeysForEpoch(epoch, keys);
-    }
-
-    private static KeysForEpoch epochRanges(long epoch, int... keys)
-    {
-        return epochRanges(epoch, IntKey.keys(keys));
-    }
-
-    private static Scope scope(long epoch, KeysForEpoch... epochKeys)
-    {
-        return new Scope(epoch, epochKeys);
+        return new Scope(epoch, keys);
     }
 
     @Test
     void createDisjointScopeTest()
     {
-        Keys keys = IntKey.keys(150);
+        Keys keys = keys(150);
         KeyRange range = range(100, 200);
         Topology topology1 = topology(1, shard(range, idList(1, 2, 3), idSet(1, 2)));
         Topology topology2 = topology(2, shard(range, idList(4, 5, 6), idSet(4, 5)));
@@ -43,16 +32,16 @@ public class TxnRequestScopeTest
         topologies.add(topology2);
         topologies.add(topology1);
 
-        Assertions.assertEquals(scope(2, epochRanges(1, 150)),
+        Assertions.assertEquals(scope(1, keys(150)),
                                 Scope.forTopologies(id(1), topologies, keys));
-        Assertions.assertEquals(scope(2, epochRanges(2, 150)),
+        Assertions.assertEquals(scope(2, keys(150)),
                                 Scope.forTopologies(id(4), topologies, keys));
     }
 
     @Test
     void movingRangeTest()
     {
-        Keys keys = IntKey.keys(150, 250);
+        Keys keys = keys(150, 250);
         KeyRange range1 = range(100, 200);
         KeyRange range2 = range(200, 300);
         Topology topology1 = topology(1,
@@ -66,9 +55,9 @@ public class TxnRequestScopeTest
         Topologies.Multi topologies = new Topologies.Multi();
         topologies.add(topology2);
         topologies.add(topology1);
-        Assertions.assertEquals(scope(2, epochRanges(1, 150), epochRanges(2, 250)),
+        Assertions.assertEquals(scope(2, keys(150, 250)),
                                 Scope.forTopologies(id(1), topologies, keys));
-        Assertions.assertEquals(scope(2, epochRanges(1, 250), epochRanges(2, 150)),
+        Assertions.assertEquals(scope(2, keys(250, 150)),
                                 Scope.forTopologies(id(4), topologies, keys));
     }
 }
index c794c24229635dc05f45d1714a45a51584bc8757..06f088f8447ceb125933b9c39094495752c8740f 100644 (file)
@@ -18,6 +18,7 @@ public class TopologyRandomizer
     private final Supplier<Random> randomSupplier;
     private final Function<Node.Id, Node> lookup;
     private final List<Topology> epochs = new ArrayList<>();
+    private final Map<Node.Id, KeyRanges> previouslyReplicated = new HashMap<>();
 
     public TopologyRandomizer(Supplier<Random> randomSupplier, Topology initialTopology, Function<Node.Id, Node> lookup)
     {
@@ -25,6 +26,8 @@ public class TopologyRandomizer
         this.lookup = lookup;
         this.epochs.add(Topology.EMPTY);
         this.epochs.add(initialTopology);
+        for (Node.Id node : initialTopology.nodes())
+            previouslyReplicated.put(node, initialTopology.rangesForNode(node));
     }
 
     private enum UpdateType
@@ -155,9 +158,41 @@ public class TopologyRandomizer
         return shards;
     }
 
+    private static Map<Node.Id, KeyRanges> getAdditions(Topology current, Topology next)
+    {
+        Map<Node.Id, KeyRanges> additions = new HashMap<>();
+        for (Node.Id node : next.nodes())
+        {
+            KeyRanges prev = current.rangesForNode(node);
+            if (prev == null) prev = KeyRanges.EMPTY;
+
+            KeyRanges added = next.rangesForNode(node).difference(prev);
+            if (added.isEmpty())
+                continue;
+
+            additions.put(node, added);
+        }
+        return additions;
+    }
+
+    private static boolean reassignsRanges(Topology current, Shard[] nextShards, Map<Node.Id, KeyRanges> previouslyReplicated)
+    {
+        Topology next = new Topology(current.epoch + 1, nextShards);
+        Map<Node.Id, KeyRanges> additions = getAdditions(current, next);
+
+        for (Map.Entry<Node.Id, KeyRanges> entry : additions.entrySet())
+        {
+            if (previouslyReplicated.getOrDefault(entry.getKey(), KeyRanges.EMPTY).intersects(entry.getValue()))
+                return true;
+        }
+        return false;
+    }
+
     public synchronized void maybeUpdateTopology()
     {
-        if (randomSupplier.get().nextInt(50) != 0)
+        // if we don't limit the number of pending topology changes in flight,
+        // the topology randomizer will keep the burn test busy indefinitely
+        if (TopologyUpdate.pendingTopologies() > 5 || randomSupplier.get().nextInt(200) != 0)
             return;
 
         Random random = randomSupplier.get();
@@ -172,6 +207,21 @@ public class TopologyRandomizer
 
         Topology nextTopology = new Topology(current.epoch + 1, shards);
 
+        // FIXME: remove this (and the corresponding check in CommandStores) once lower bounds are implemented.
+        //  In the meantime, the logic needed to support acquiring ranges that we previously replicated is pretty
+        //  convoluted without the ability to jettison epochs.
+        if (reassignsRanges(current, shards, previouslyReplicated))
+            return;
+
+        Map<Node.Id, KeyRanges> nextAdditions = getAdditions(current, nextTopology);
+        for (Map.Entry<Node.Id, KeyRanges> entry : nextAdditions.entrySet())
+        {
+            KeyRanges previous = previouslyReplicated.getOrDefault(entry.getKey(), KeyRanges.EMPTY);
+            KeyRanges added = entry.getValue();
+            KeyRanges merged = previous.merge(added).mergeTouching();
+            previouslyReplicated.put(entry.getKey(), merged);
+        }
+
         logger.debug("topology update to: {} from: {}", nextTopology, current);
         epochs.add(nextTopology);
 
index 4db5452f15326d99a8410fd26c6c1fee62fb5b93..290251c80c8f3ff8f7220bfe56a3b51ab74acdc2 100644 (file)
@@ -123,6 +123,7 @@ public class MessageTask extends AsyncPromise<Void> implements Runnable
                         List<Node.Id> recipients,
                         String desc, NodeProcess process)
     {
+        Preconditions.checkArgument(!recipients.isEmpty());
         this.originator = originator;
         this.recipients = ImmutableList.copyOf(recipients);
         this.desc = desc;
index af4d294d578ebeb62f450763f2478f9006d93ac9..eb0eeeb049731606e75f2d403140bdcd58eca859 100644 (file)
@@ -14,7 +14,7 @@ import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
 import accord.coordinate.Timeout;
-import accord.local.CommandStore;
+import accord.local.CommandStores;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.api.MessageSink;
@@ -264,7 +264,7 @@ public class Cluster implements Scheduler
             {
                 MessageSink messageSink = sinks.create(node, randomSupplier.get());
                 lookup.put(node, new Node(node, messageSink, new SimpleConfigService(topology),
-                                          nowSupplier.get(), MaelstromStore::new, MaelstromAgent.INSTANCE, sinks, CommandStore.Factory.SINGLE_THREAD));
+                                          nowSupplier.get(), MaelstromStore::new, MaelstromAgent.INSTANCE, sinks, CommandStores.SingleThread::new));
             }
 
             List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes));
index ef2cb62132662baedd3e8c9062ff27f5fd6d7b1d..1be6e207f7fe89ca7a370be0ba949d141dbbc118 100644 (file)
@@ -10,7 +10,8 @@ public class MaelstromData extends TreeMap<Key, Value> implements Data
     @Override
     public Data merge(Data data)
     {
-        this.putAll(((MaelstromData)data));
+        if (data != null)
+            this.putAll(((MaelstromData)data));
         return this;
     }
 }
index 5585138e40f68d7ee94b45514b7e581ec39fede1..5cad157495480f4b71ac4c2e83db13fbae513b83 100644 (file)
@@ -18,6 +18,8 @@ public class MaelstromReplyContext implements ReplyContext
 
     public static long messageIdFor(ReplyContext replyContext)
     {
+        if (replyContext instanceof Packet)
+            return ((Packet) replyContext).body.msg_id;
         return ((MaelstromReplyContext) replyContext).messageId;
     }
 }
index 30c593094cc7b437b73fac425c6b1650dd70f6d8..1c4e7f9d6edbfe4bef96b3b37db9fd867eee3c7b 100644 (file)
@@ -15,6 +15,7 @@ import java.util.function.Supplier;
 
 import accord.coordinate.Timeout;
 import accord.local.CommandStore;
+import accord.local.CommandStores;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.api.Scheduler;
@@ -139,7 +140,8 @@ public class Main
             MaelstromInit init = (MaelstromInit) packet.body;
             topology = topologyFactory.toTopology(init.cluster);
             sink = new StdoutSink(System::currentTimeMillis, scheduler, start, init.self, out, err);
-            on = new Node(init.self, sink, new SimpleConfigService(topology), System::currentTimeMillis, MaelstromStore::new, MaelstromAgent.INSTANCE, scheduler, CommandStore.Factory.SINGLE_THREAD);
+            on = new Node(init.self, sink, new SimpleConfigService(topology), System::currentTimeMillis,
+                          MaelstromStore::new, MaelstromAgent.INSTANCE, scheduler, CommandStores.SingleThread::new);
             err.println("Initialized node " + init.self);
             err.flush();
             sink.send(packet.src, new Body(Type.init_ok, Body.SENTINEL_MSG_ID, init.msg_id));
index b6ec2029dca624c3b369eecbfae44e986ada5804..24dbbe3ad30e16950d0fe56ba453fbbe92f2e934 100644 (file)
@@ -35,9 +35,9 @@ public class SimpleConfigService implements ConfigurationService
     }
 
     @Override
-    public Future<Void> fetchTopologyForEpoch(long epoch)
+    public void fetchTopologyForEpoch(long epoch)
     {
-        return SUCCESS;
+        return;
     }
 
     @Override