Topology getTopologyForEpoch(long epoch);
/**
- * Method for reporting epochs the configuration service may not be aware of, and optionally running a supplied
- * runnable once the corresponding topology has been received and applied. If the configuration service is already
- * aware of the reported epoch, the runnable should be run immediately.
+ * Method for reporting epochs the configuration service may not be aware of. To be notified when the new epoch
+ * is available locally, use {@link accord.topology.TopologyManager#awaitEpoch(long)}
*/
- Future<Void> fetchTopologyForEpoch(long epoch);
+ void fetchTopologyForEpoch(long epoch);
/**
* Alert the configuration service of epochs it may not be aware of. This is called called for every TxnRequest
volatile long supersedingEpoch = -1;
private final boolean fastPathPermitted;
private final Set<Id> successes = new HashSet<>();
- private final Set<Id> failures = new HashSet<>();
+ private Set<Id> failures;
public PreacceptTracker(Topologies topologies, boolean fastPathPermitted)
{
@Override
public void recordFailure(Id node)
{
+ if (failures == null)
+ failures = new HashSet<>();
failures.add(node);
super.recordFailure(node);
}
{
PreacceptTracker tracker = new PreacceptTracker(topologies, false);
successes.forEach(tracker::recordSuccess);
- failures.forEach(tracker::recordFailure);
+ if (failures != null)
+ failures.forEach(tracker::recordFailure);
return tracker;
}
{
return fastPathPermitted && super.hasMetFastPathCriteria();
}
+
+ boolean shouldSlowPathAccept()
+ {
+ return (!fastPathPermitted || !hasInFlight()) && hasReachedQuorum();
+ }
}
final Keys keys;
tryFailure(new Timeout());
// if no other responses are expected and the slow quorum has been satisfied, proceed
- if (shouldSlowPathAccept())
+ if (tracker.shouldSlowPathAccept())
onPreAccepted();
}
if (!needMessages.isEmpty())
node.send(needMessages, to -> new PreAccept(to, newTopologies, txnId, txn), this);
- if (shouldSlowPathAccept())
+ if (tracker.shouldSlowPathAccept())
onPreAccepted();
}
if (!fastPath && ok.witnessedAt.epoch > txnId.epoch)
{
if (tracker.recordSupersedingEpoch(ok.witnessedAt.epoch))
- node.configService().fetchTopologyForEpoch(ok.witnessedAt.epoch).addListener(this::onEpochUpdate);
+ {
+ node.configService().fetchTopologyForEpoch(ok.witnessedAt.epoch);
+ node.topology().awaitEpoch(ok.witnessedAt.epoch).addListener(this::onEpochUpdate);
+ }
}
- if (!tracker.hasSupersedingEpoch() && (tracker.hasMetFastPathCriteria() || shouldSlowPathAccept()))
+ if (!tracker.hasSupersedingEpoch() && (tracker.hasMetFastPathCriteria() || tracker.shouldSlowPathAccept()))
onPreAccepted();
}
}
}
- private boolean shouldSlowPathAccept()
- {
- return (!tracker.fastPathPermitted || !tracker.hasInFlight()) && tracker.hasReachedQuorum();
- }
-
private boolean isPreAccepted()
{
return preacceptOutcome != null;
{
long executeEpoch = agreed.executeAt.epoch;
ConfigurationService configService = node.configService();
- if (executeEpoch > configService.currentEpoch())
- return configService.fetchTopologyForEpoch(executeEpoch)
- .flatMap(v -> fetchEpochOrExecute(node, agreed));
+ if (executeEpoch > node.topology().epoch())
+ {
+ configService.fetchTopologyForEpoch(executeEpoch);
+ return node.topology().awaitEpoch(executeEpoch).flatMap(v -> fetchEpochOrExecute(node, agreed));
+ }
return Execute.execute(node, agreed);
}
return false;
}
- protected <V> V accumulate(BiFunction<T, V, V> function, V start)
+ protected <V> V foldl(BiFunction<T, V, V> function, V accumulator)
{
for (T tracker : trackers)
- start = function.apply(tracker, start);
- return start;
+ accumulator = function.apply(tracker, accumulator);
+ return accumulator;
}
public Set<Node.Id> nodes()
*/
public Set<Id> computeMinimalReadSetAndMarkInflight()
{
- Set<ReadShardTracker> toRead = accumulate((tracker, accumulate) -> {
+ Set<ReadShardTracker> toRead = foldl((tracker, accumulate) -> {
if (!tracker.shouldRead())
return accumulate;
{
return "Command{" +
"txnId=" + txnId +
+ ", status=" + status +
", txn=" + txn +
", executeAt=" + executeAt +
", deps=" + deps +
- ", status=" + status +
'}';
}
}
import accord.api.Key;
import accord.api.KeyRange;
import accord.api.Store;
+import accord.local.CommandStores.StoreGroup;
+import accord.local.Node.Id;
import accord.topology.KeyRanges;
import accord.topology.Topology;
import accord.txn.Keys;
import accord.txn.Timestamp;
import accord.txn.TxnId;
-import com.google.common.base.Preconditions;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.Promise;
import java.util.function.Function;
import java.util.function.Supplier;
+import com.google.common.base.Preconditions;
+import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
+
/**
* Single threaded internal shard of accord transaction metadata
*/
Store store,
KeyRanges ranges,
Supplier<Topology> localTopologySupplier);
- Factory SYNCHRONIZED = Synchronized::new;
- Factory SINGLE_THREAD = SingleThread::new;
- Factory SINGLE_THREAD_DEBUG = SingleThreadDebug::new;
}
private final int generation;
return ranges;
}
- void purgeRanges(KeyRanges removed)
- {
- for (KeyRange range : removed)
- {
- NavigableMap<Key, CommandsForKey> subMap = commandsForKey.subMap(range.start(), range.startInclusive(), range.end(), range.endInclusive());
- Iterator<Key> keyIterator = subMap.keySet().iterator();
- while (keyIterator.hasNext())
- {
- Key key = keyIterator.next();
- CommandsForKey forKey = commandsForKey.get(key);
- if (forKey != null)
- {
- for (Command command : forKey)
- if (command.txn() != null && !ranges.intersects(command.txn().keys))
- commands.remove(command.txnId());
- }
- keyIterator.remove();
- }
- }
- }
-
public void forEpochCommands(KeyRanges ranges, long epoch, Consumer<Command> consumer)
{
Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE, Integer.MIN_VALUE, Node.Id.NONE);
public boolean hashIntersects(Key key)
{
- return CommandStores.keyIndex(key, numShards) == index;
+ return StoreGroup.keyIndex(key, numShards) == index;
}
public boolean intersects(Keys keys)
return keys.any(ranges, this::hashIntersects);
}
+ public boolean contains(Key key)
+ {
+ return ranges.contains(key);
+ }
+
public static void onEach(Collection<CommandStore> stores, Consumer<? super CommandStore> consumer)
{
for (CommandStore store : stores)
public abstract Future<Void> process(Consumer<? super CommandStore> consumer);
+ public abstract <T> Future<T> process(Function<? super CommandStore, T> function);
+
public void processBlocking(Consumer<? super CommandStore> consumer)
{
try
}
catch (InterruptedException e)
{
- Thread.currentThread().interrupt();
+ throw new UncheckedInterruptedException(e);
}
catch (ExecutionException e)
{
public static class Synchronized extends CommandStore
{
- public Synchronized(int generation,
- int index,
- int numShards,
- Node.Id nodeId,
- Function<Timestamp, Timestamp> uniqueNow,
- Agent agent,
- Store store,
- KeyRanges ranges,
- Supplier<Topology> localTopologySupplier)
+ public Synchronized(int generation, int index, int numShards, Node.Id nodeId,
+ Function<Timestamp, Timestamp> uniqueNow,
+ Agent agent, Store store,
+ KeyRanges ranges, Supplier<Topology> localTopologySupplier)
{
super(generation, index, numShards, nodeId, uniqueNow, agent, store, ranges, localTopologySupplier);
}
return promise;
}
+ @Override
+ public <T> Future<T> process(Function<? super CommandStore, T> function)
+ {
+ AsyncPromise<T> promise = new AsyncPromise<>();
+ processInternal(function, promise);
+ return promise;
+ }
+
@Override
public void shutdown() {}
}
}
}
- public SingleThread(int generation,
- int index,
- int numShards,
- Node.Id nodeId,
- Function<Timestamp, Timestamp> uniqueNow,
- Agent agent,
- Store store,
- KeyRanges ranges,
- Supplier<Topology> localTopologySupplier)
+ private class FunctionWrapper<T> extends AsyncPromise<T> implements Runnable
+ {
+ private final Function<? super CommandStore, T> function;
+
+ public FunctionWrapper(Function<? super CommandStore, T> function)
+ {
+ this.function = function;
+ }
+
+ @Override
+ public void run()
+ {
+ processInternal(function, this);
+ }
+ }
+
+ public SingleThread(int generation, int index, int numShards, Node.Id nodeId,
+ Function<Timestamp, Timestamp> uniqueNow,
+ Agent agent, Store store,
+ KeyRanges ranges, Supplier<Topology> localTopologySupplier)
{
super(generation, index, numShards, nodeId, uniqueNow, agent, store, ranges, localTopologySupplier);
executor = Executors.newSingleThreadExecutor(r -> {
return future;
}
+ @Override
+ public <T> Future<T> process(Function<? super CommandStore, T> function)
+ {
+ FunctionWrapper<T> future = new FunctionWrapper<>(function);
+ executor.execute(future);
+ return future;
+ }
+
@Override
public void shutdown()
{
}
}
- public static class SingleThreadDebug extends SingleThread
+ static class Debug extends SingleThread
{
private final AtomicReference<Thread> expectedThread = new AtomicReference<>();
- public SingleThreadDebug(int generation,
- int index,
- int numShards,
- Node.Id nodeId,
- Function<Timestamp, Timestamp> uniqueNow,
- Agent agent,
- Store store,
- KeyRanges ranges,
- Supplier<Topology> localTopologySupplier)
+ public Debug(int generation, int index, int numShards, Node.Id nodeId,
+ Function<Timestamp, Timestamp> uniqueNow,
+ Agent agent, Store store,
+ KeyRanges ranges, Supplier<Topology> localTopologySupplier)
{
super(generation, index, numShards, nodeId, uniqueNow, agent, store, ranges, localTopologySupplier);
}
super.processInternal(consumer, future);
}
}
+
}
import accord.api.Agent;
import accord.api.Key;
import accord.api.Store;
+import accord.local.CommandStore.SingleThread;
+import accord.local.CommandStores.StoreGroups.Fold;
+import accord.local.Node.Id;
import accord.messages.TxnRequest;
import accord.topology.KeyRanges;
import accord.topology.Topology;
import accord.txn.Keys;
import accord.txn.Timestamp;
-import com.google.common.base.Preconditions;
import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.function.*;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
+
+import static java.lang.Boolean.FALSE;
/**
* Manages the single threaded metadata shards
*/
-public class CommandStores
+public abstract class CommandStores
{
+ public interface Factory
+ {
+ CommandStores create(int num, Node.Id node, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store);
+ }
+
static class StoreGroup
{
final CommandStore[] stores;
public StoreGroup(CommandStore[] stores, KeyRanges ranges)
{
+ Preconditions.checkArgument(stores.length <= 64);
this.stores = stores;
this.ranges = ranges;
}
- private static class AccumulatingBitset extends BitSet implements Keys.KeyAccumulator<Keys>
+ long all()
{
- final int numStores;
- boolean isDone = false;
- public AccumulatingBitset(int numStores)
- {
- super(numStores);
- this.numStores = numStores;
- }
-
- @Override
- public Keys accumulate(Key key, Keys keys)
- {
- int idx = keyIndex(key, numStores);
- if (get(idx))
- return keys;
- set(idx);
- isDone = cardinality() == numStores;
- return keys;
- }
-
- @Override
- public boolean isDone()
- {
- return isDone;
- }
+ return -1L >>> (64 - stores.length);
}
- public Stream<CommandStore> stream()
+ long matches(Keys keys)
{
- return StreamSupport.stream(new ShardSpliterator(stores), false);
+ return keys.foldl(ranges, StoreGroup::addKeyIndex, stores.length, 0L, -1L);
}
- public Stream<CommandStore> stream(Keys keys)
+ long matches(TxnRequest.Scope scope)
{
- AccumulatingBitset bitSet = new AccumulatingBitset(stores.length);
- keys.accumulate(ranges, bitSet, keys);
- if (bitSet.cardinality() == 0)
- return null;
- return StreamSupport.stream(new ShardSpliterator(stores, bitSet::get), false);
+ return matches(scope.keys());
}
- public Stream<CommandStore> stream(TxnRequest.Scope scope)
+ static long keyIndex(Key key, long numShards)
{
- AccumulatingBitset bitSet = new AccumulatingBitset(stores.length);
- for (int i=0, mi=scope.size(); i<mi; i++)
- {
- Keys keys = scope.get(i).keys;
- keys.accumulate(ranges, bitSet, keys);
- if (bitSet.isDone)
- break;
- }
- if (bitSet.cardinality() == 0)
- return null;
- return StreamSupport.stream(new ShardSpliterator(stores, bitSet::get), false);
+ return Integer.toUnsignedLong(key.keyHash()) % numShards;
}
- }
- static int keyIndex(Key key, int numShards)
- {
- return (int) (Integer.toUnsignedLong(key.keyHash()) % numShards);
+ private static long addKeyIndex(Key key, long numShards, long accumulate)
+ {
+ return accumulate | (1L << keyIndex(key, numShards));
+ }
}
static class StoreGroups
{
- static final StoreGroups EMPTY = new StoreGroups(new StoreGroup[0], Topology.EMPTY, Topology.EMPTY);
final StoreGroup[] groups;
final Topology global;
final Topology local;
+ final int size;
public StoreGroups(StoreGroup[] groups, Topology global, Topology local)
{
this.groups = groups;
this.global = global;
this.local = local;
+ int size = 0;
+ for (StoreGroup group : groups)
+ size += group.stores.length;
+ this.size = size;
}
StoreGroups withNewTopology(Topology global, Topology local)
return new StoreGroups(groups, global, local);
}
- public Stream<CommandStore> stream()
+ public int size()
{
- Stream<CommandStore> stream = null;
- for (StoreGroup group : groups)
- {
- Stream<CommandStore> nextStream = group.stream();
- if (nextStream == null) continue;
- stream = stream != null ? Stream.concat(stream, nextStream) : nextStream;
- }
-
- return stream != null ? stream : Stream.empty();
+ return size;
}
- public Stream<CommandStore> stream(Keys keys)
+ private <I1, I2, O> O foldl(int startGroup, long bitset, Fold<? super I1, ? super I2, O> fold, I1 param1, I2 param2, O accumulator)
{
- Stream<CommandStore> stream = null;
- for (StoreGroup group : groups)
+ int groupIndex = startGroup;
+ StoreGroup group = groups[groupIndex];
+ int offset = 0;
+ while (true)
{
- Stream<CommandStore> nextStream = group.stream(keys);
- if (nextStream == null) continue;
- stream = stream != null ? Stream.concat(stream, nextStream) : nextStream;
+ int i = Long.numberOfTrailingZeros(bitset) - offset;
+ while (i < group.stores.length)
+ {
+ accumulator = fold.fold(group.stores[i], param1, param2, accumulator);
+ bitset ^= Long.lowestOneBit(bitset);
+ i = Long.numberOfTrailingZeros(bitset) - offset;
+ }
+
+ if (++groupIndex == groups.length)
+ break;
+
+ if (bitset == 0)
+ break;
+
+ offset += group.stores.length;
+ group = groups[groupIndex];
+ if (offset + group.stores.length > 64)
+ break;
}
+ return accumulator;
+ }
- return stream != null ? stream : Stream.empty();
+ interface Fold<I1, I2, O>
+ {
+ O fold(CommandStore store, I1 i1, I2 i2, O accumulator);
}
- public Stream<CommandStore> stream(TxnRequest.Scope scope)
+ <S, I1, I2, O> O foldl(ToLongBiFunction<StoreGroup, S> select, S scope, Fold<? super I1, ? super I2, O> fold, I1 param1, I2 param2, IntFunction<? extends O> factory)
{
- Stream<CommandStore> stream = null;
- for (StoreGroup group : groups)
+ O accumulator = null;
+ int startGroup = 0;
+ while (startGroup < groups.length)
{
- Stream<CommandStore> nextStream = group.stream(scope);
- if (nextStream == null) continue;
- stream = stream != null ? Stream.concat(stream, nextStream) : nextStream;
+ long bits = select.applyAsLong(groups[startGroup], scope);
+ if (bits == 0)
+ {
+ ++startGroup;
+ continue;
+ }
+
+ int offset = groups[startGroup].stores.length;
+ int endGroup = startGroup + 1;
+ while (endGroup < groups.length)
+ {
+ StoreGroup group = groups[endGroup];
+ if (offset + group.stores.length > 64)
+ break;
+
+ bits += select.applyAsLong(group, scope) << offset;
+ offset += group.stores.length;
+ ++endGroup;
+ }
+
+ if (accumulator == null)
+ accumulator = factory.apply(Long.bitCount(bits));
+
+ accumulator = foldl(startGroup, bits, fold, param1, param2, accumulator);
+ startGroup = endGroup;
}
- return stream != null ? stream : Stream.empty();
+ return accumulator;
}
}
private final Store store;
private final CommandStore.Factory shardFactory;
private final int numShards;
- private volatile StoreGroups groups = StoreGroups.EMPTY;
+ protected volatile StoreGroups groups = new StoreGroups(new StoreGroup[0], Topology.EMPTY, Topology.EMPTY);
public CommandStores(int num, Node.Id node, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store, CommandStore.Factory shardFactory)
{
commandStore.shutdown();
}
- public Stream<CommandStore> stream()
+ protected abstract <S> void forEach(ToLongBiFunction<StoreGroup, S> select, S scope, Consumer<? super CommandStore> forEach);
+ protected abstract <S, T> T mapReduce(ToLongBiFunction<StoreGroup, S> select, S scope, Function<? super CommandStore, T> map, BiFunction<T, T, T> reduce);
+
+ public void forEach(Consumer<CommandStore> forEach)
+ {
+ forEach((s, i) -> s.all(), null, forEach);
+ }
+
+ public void forEach(Keys keys, Consumer<CommandStore> forEach)
+ {
+ forEach(StoreGroup::matches, keys, forEach);
+ }
+
+ public void forEach(TxnRequest.Scope scope, Consumer<CommandStore> forEach)
{
- return groups.stream();
+ forEach(StoreGroup::matches, scope, forEach);
}
- public Stream<CommandStore> forKeys(Keys keys)
+ public <T> T mapReduce(TxnRequest.Scope scope, Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
{
- return groups.stream(keys);
+ return mapReduce(StoreGroup::matches, scope, map, reduce);
}
- public Stream<CommandStore> forScope(TxnRequest.Scope scope)
+ public <T extends Collection<CommandStore>> T collect(Keys keys, IntFunction<T> factory)
{
- return groups.stream(scope);
+ return groups.foldl(StoreGroup::matches, keys, CommandStores::append, null, null, factory);
+ }
+
+ public <T extends Collection<CommandStore>> T collect(TxnRequest.Scope scope, IntFunction<T> factory)
+ {
+ return groups.foldl(StoreGroup::matches, scope, CommandStores::append, null, null, factory);
+ }
+
+ private static <T extends Collection<CommandStore>> T append(CommandStore store, Object ignore1, Object ignore2, T to)
+ {
+ to.add(store);
+ return to;
}
public synchronized void updateTopology(Topology cluster)
return;
Topology local = cluster.forNode(node);
- KeyRanges currentRanges = Arrays.stream(current.groups).map(group -> group.ranges).reduce(KeyRanges.EMPTY, (l, r) -> l.union(r)).mergeTouching();
- KeyRanges added = local.ranges().difference(currentRanges);
+ KeyRanges added = local.ranges().difference(current.local.ranges());
+
+ for (StoreGroup group : groups.groups)
+ {
+ // FIXME: remove this (and the corresponding check in TopologyRandomizer) once lower bounds are implemented.
+ // In the meantime, the logic needed to support acquiring ranges that we previously replicated is pretty
+ // convoluted without the ability to jettison epochs.
+ Preconditions.checkState(!group.ranges.intersects(added));
+ }
if (added.isEmpty())
{
groups = new StoreGroups(newGroups, cluster, local);
}
- private static class ShardSpliterator implements Spliterator<CommandStore>
+ @VisibleForTesting
+ public CommandStore unsafeForKey(Key key)
{
- int i = 0;
- final CommandStore[] commandStores;
- final IntPredicate predicate;
-
- public ShardSpliterator(CommandStore[] commandStores, IntPredicate predicate)
+ for (StoreGroup group : groups.groups)
{
- this.commandStores = commandStores;
- this.predicate = predicate;
+ if (group.ranges.contains(key))
+ {
+ for (CommandStore store : group.stores)
+ {
+ if (store.hashIntersects(key))
+ return store;
+ }
+ }
}
+ throw new IllegalArgumentException();
+ }
- public ShardSpliterator(CommandStore[] commandStores)
+ public static class Synchronized extends CommandStores
+ {
+ public Synchronized(int num, Id node, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store)
{
- this (commandStores, i -> true);
+ super(num, node, uniqueNow, agent, store, CommandStore.Synchronized::new);
}
@Override
- public boolean tryAdvance(Consumer<? super CommandStore> action)
+ protected <S, T> T mapReduce(ToLongBiFunction<StoreGroup, S> select, S scope, Function<? super CommandStore, T> map, BiFunction<T, T, T> reduce)
{
- while (i < commandStores.length)
- {
- int idx = i++;
- if (!predicate.test(idx))
- continue;
- try
- {
- commandStores[idx].process(action).get();
- break;
- }
- catch (InterruptedException | ExecutionException e)
- {
- throw new RuntimeException(e);
- }
-
- }
- return i < commandStores.length;
+ return groups.foldl(select, scope, (store, f, r, t) -> t == null ? f.apply(store) : r.apply(t, f.apply(store)), map, reduce, ignore -> null);
}
@Override
- public void forEachRemaining(Consumer<? super CommandStore> action)
+ protected <S> void forEach(ToLongBiFunction<StoreGroup, S> select, S scope, Consumer<? super CommandStore> forEach)
{
- if (i >= commandStores.length)
- return;
+ groups.foldl(select, scope, (store, f, r, t) -> { f.accept(store); return null; }, forEach, null, ignore -> FALSE);
+ }
+ }
- List<Future<Void>> futures = new ArrayList<>(commandStores.length - i);
- for (; i< commandStores.length; i++)
- {
- if (predicate.test(i))
- futures.add(commandStores[i].process(action));
- }
+ public static class SingleThread extends CommandStores
+ {
+ public SingleThread(int num, Id node, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store)
+ {
+ this(num, node, uniqueNow, agent, store, CommandStore.SingleThread::new);
+ }
- try
- {
- for (int i=0, mi=futures.size(); i<mi; i++)
- futures.get(i).get();
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- catch (ExecutionException e)
+ public SingleThread(int num, Node.Id node, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store, CommandStore.Factory shardFactory)
+ {
+ super(num, node, uniqueNow, agent, store, shardFactory);
+ }
+
+ private <S, F, T> T mapReduce(ToLongBiFunction<StoreGroup, S> select, S scope, F f, Fold<F, ?, List<Future<T>>> fold, BiFunction<T, T, T> reduce)
+ {
+ List<Future<T>> futures = groups.foldl(select, scope, fold, f, null, ArrayList::new);
+ T result = null;
+ for (Future<T> future : futures)
{
- Throwable cause = e.getCause();
- throw new RuntimeException(cause != null ? cause : e);
+ try
+ {
+ T next = future.get();
+ if (result == null) result = next;
+ else result = reduce.apply(result, next);
+ }
+ catch (InterruptedException e)
+ {
+ throw new UncheckedInterruptedException(e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e.getCause());
+ }
}
+ return result;
}
@Override
- public Spliterator<CommandStore> trySplit()
+ protected <S, T> T mapReduce(ToLongBiFunction<StoreGroup, S> select, S scope, Function<? super CommandStore, T> map, BiFunction<T, T, T> reduce)
{
- return null;
+ return mapReduce(select, scope, map, (store, f, i, t) -> { t.add(store.process(f)); return t; }, reduce);
}
- @Override
- public long estimateSize()
+ protected <S> void forEach(ToLongBiFunction<StoreGroup, S> select, S scope, Consumer<? super CommandStore> forEach)
{
- return commandStores.length;
+ mapReduce(select, scope, forEach, (store, f, i, t) -> { t.add(store.process(f)); return t; }, (Void i1, Void i2) -> null);
}
+ }
- @Override
- public int characteristics()
+ public static class Debug extends SingleThread
+ {
+ public Debug(int num, Id node, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store)
{
- return Spliterator.SIZED | Spliterator.NONNULL | Spliterator.DISTINCT | Spliterator.IMMUTABLE;
+ super(num, node, uniqueNow, agent, store, CommandStore.Debug::new);
}
}
+
}
{
return Iterators.concat(uncommitted.values().iterator(), committedByExecuteAt.values().iterator());
}
+
+ public boolean isEmpty()
+ {
+ return uncommitted.isEmpty() && committedById.isEmpty();
+ }
}
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.function.IntFunction;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
-import java.util.stream.Stream;
+
+import com.google.common.annotations.VisibleForTesting;
import accord.api.*;
import accord.coordinate.Coordinate;
private final Set<TxnId> pendingRecovery = Collections.newSetFromMap(new ConcurrentHashMap<>());
public Node(Id id, MessageSink messageSink, ConfigurationService configService, LongSupplier nowSupplier,
- Supplier<Store> dataSupplier, Agent agent, Scheduler scheduler, CommandStore.Factory commandStoreFactory)
+ Supplier<Store> dataSupplier, Agent agent, Scheduler scheduler, CommandStores.Factory factory)
{
this.id = id;
this.agent = agent;
this.now = new AtomicReference<>(new Timestamp(topology.epoch(), nowSupplier.getAsLong(), 0, id));
this.nowSupplier = nowSupplier;
this.scheduler = scheduler;
- this.commandStores = new CommandStores(numCommandShards(),
- id,
- this::uniqueNow,
- agent,
- dataSupplier.get(),
- commandStoreFactory);
+ this.commandStores = factory.create(numCommandShards(), id, this::uniqueNow, agent, dataSupplier.get());
configService.registerListener(this);
onTopologyUpdate(topology, false);
return nowSupplier.getAsLong();
}
- public Stream<CommandStore> local(Keys keys)
+ public void forEachLocal(Consumer<CommandStore> forEach)
{
- return commandStores.forKeys(keys);
+ commandStores.forEach(forEach);
}
- public Stream<CommandStore> local(Txn txn)
+ public void forEachLocal(Keys keys, Consumer<CommandStore> forEach)
{
- return commandStores.forKeys(txn.keys());
+ commandStores.forEach(keys, forEach);
}
- public Stream<CommandStore> local(TxnRequest.Scope scope)
+ public void forEachLocal(Txn txn, Consumer<CommandStore> forEach)
{
- return commandStores.forScope(scope);
+ forEachLocal(txn.keys, forEach);
}
- public Stream<CommandStore> local()
+ public void forEachLocal(TxnRequest.Scope scope, Consumer<CommandStore> forEach)
{
- return commandStores.stream();
+ commandStores.forEach(scope, forEach);
}
- public Optional<CommandStore> local(Key key)
+ public <T> T mapReduceLocal(TxnRequest.Scope scope, Function<CommandStore, T> map, BiFunction<T, T, T> reduce)
{
- return local(Keys.of(key)).reduce((i1, i2) -> {
- throw new IllegalStateException("more than one instance encountered for key");
- });
+ return commandStores.mapReduce(scope, map, reduce);
+ }
+
+ public <T extends Collection<CommandStore>> T collectLocal(Keys keys, IntFunction<T> factory)
+ {
+ return commandStores.collect(keys, factory);
+ }
+
+ public <T extends Collection<CommandStore>> T collectLocal(TxnRequest.Scope scope, IntFunction<T> factory)
+ {
+ return commandStores.collect(scope, factory);
}
// send to every node besides ourselves
// TODO: The combination of updating the epoch of the next timestamp with epochs we don’t have topologies for,
// and requiring preaccept to talk to its topology epoch means that learning of a new epoch via timestamp
// (ie not via config service) will halt any new txns from a node until it receives this topology
- if (txnId.epoch > configService.currentEpoch())
- return configService.fetchTopologyForEpoch(txnId.epoch).flatMap(v -> coordinate(txnId, txn));
+ if (txnId.epoch > topology().epoch())
+ {
+ configService.fetchTopologyForEpoch(txnId.epoch);
+ return topology().awaitEpoch(txnId.epoch).flatMap(v -> coordinate(txnId, txn));
+ }
Future<Result> result = Coordinate.execute(this, txnId, txn);
coordinating.put(txnId, result);
// TODO: encapsulate in Coordinate, so we can request that e.g. commits be re-sent?
public void recover(TxnId txnId, Txn txn)
{
- if (txnId.epoch > configService.currentEpoch())
+ if (txnId.epoch > topology.epoch())
{
- configService.fetchTopologyForEpoch(txnId.epoch).addListener(() -> recover(txnId, txn));
+ configService.fetchTopologyForEpoch(txnId.epoch);
+ topology().awaitEpoch(txnId.epoch).addListener(() -> recover(txnId, txn));
return;
}
if (result != null)
return;
- if (txnId.epoch > topology().epoch())
- {
- configService().fetchTopologyForEpoch(txnId.epoch).addListener(() -> recover(txnId, txn));
- return;
- }
-
result = Coordinate.recover(this, txnId, txn);
coordinating.putIfAbsent(txnId, result);
// TODO (now): error handling
long unknownEpoch = topology().maxUnknownEpoch(request);
if (unknownEpoch > 0)
{
- configService.fetchTopologyForEpoch(unknownEpoch).addListener(() -> receive(request, from, replyContext));
+ configService.fetchTopologyForEpoch(unknownEpoch);
+ topology().awaitEpoch(unknownEpoch).addListener(() -> receive(request, from, replyContext));
return;
}
scheduler.now(() -> request.process(this, from, replyContext));
{
return "Node{" + id + '}';
}
+
+ @VisibleForTesting
+ public CommandStore unsafeForKey(Key key)
+ {
+ return commandStores.unsafeForKey(key);
+ }
+
}
public void process(Node on, Node.Id replyToNode, ReplyContext replyContext)
{
- on.reply(replyToNode, replyContext, on.local(scope()).map(instance -> {
+ on.reply(replyToNode, replyContext, on.mapReduceLocal(scope(), instance -> {
Command command = instance.command(txnId);
if (!command.accept(ballot, txn, executeAt, deps))
return new AcceptNack(txnId, command.promised());
return new AcceptOk(txnId, calculateDeps(instance, txnId, txn, executeAt));
- }).reduce((r1, r2) -> {
+ }, (r1, r2) -> {
if (!r1.isOK()) return r1;
if (!r2.isOK()) return r2;
AcceptOk ok1 = (AcceptOk) r1;
if (ok2.deps.isEmpty()) return ok1;
ok1.deps.addAll(ok2.deps);
return ok1;
- }).orElseThrow());
+ }));
}
@Override
public void process(Node node, Id replyToNode, ReplyContext replyContext)
{
- node.local(scope()).forEach(instance -> instance.command(txnId).apply(txn, deps, executeAt, writes, result));
+ node.forEachLocal(scope(), instance -> instance.command(txnId).apply(txn, deps, executeAt, writes, result));
}
@Override
public void process(Node node, Id replyToNode, ReplyContext replyContext)
{
- RecoverReply reply = node.local(scope()).map(instance -> {
+ RecoverReply reply = node.mapReduceLocal(scope(), instance -> {
Command command = instance.command(txnId);
if (!command.recover(txn, ballot))
.collect(Dependencies::new, Dependencies::add, Dependencies::addAll);
}
return new RecoverOk(txnId, command.status(), command.accepted(), command.executeAt(), deps, earlierCommittedWitness, earlierAcceptedNoWitness, rejectsFastPath, command.writes(), command.result());
- }).reduce((r1, r2) -> {
+ }, (r1, r2) -> {
if (!r1.isOK()) return r1;
if (!r2.isOK()) return r2;
RecoverOk ok1 = (RecoverOk) r1;
ok1.earlierAcceptedNoWitness,
ok1.rejectsFastPath | ok2.rejectsFastPath,
ok1.writes, ok1.result);
- }).orElseThrow();
+ });
node.reply(replyToNode, replyContext, reply);
if (reply instanceof RecoverOk && ((RecoverOk) reply).status == Applied)
// disseminate directly
RecoverOk ok = (RecoverOk) reply;
ConfigurationService configService = node.configService();
- if (ok.executeAt.epoch > configService.currentEpoch())
+ if (ok.executeAt.epoch > node.topology().epoch())
{
- configService.fetchTopologyForEpoch(ok.executeAt.epoch).addListener(() -> disseminateApply(node, ok));
+ configService.fetchTopologyForEpoch(ok.executeAt.epoch);
+ node.topology().awaitEpoch(ok.executeAt.epoch).addListener(() -> disseminateApply(node, ok));
return;
}
disseminateApply(node, ok);
private void disseminateApply(Node node, RecoverOk ok)
{
Preconditions.checkArgument(ok.status == Applied);
+ if (ok.executeAt.epoch > node.epoch())
+ {
+ node.configService().fetchTopologyForEpoch(ok.executeAt.epoch);
+ node.topology().awaitEpoch(ok.executeAt.epoch).addListener(() -> disseminateApply(node, ok));
+ return;
+ }
Topologies topologies = node.topology().forKeys(txn.keys, ok.executeAt.epoch);
node.send(topologies.nodes(), to -> new Apply(to, topologies, txnId, txn, ok.executeAt, ok.deps, ok.writes, ok.result));
}
public void process(Node node, Id from, ReplyContext replyContext)
{
- node.local(scope()).forEach(instance -> instance.command(txnId).commit(txn, deps, executeAt));
+ node.forEachLocal(scope(), instance -> instance.command(txnId).commit(txn, deps, executeAt));
if (read) super.process(node, from, replyContext);
}
public void process(Node node, Id from, ReplyContext replyContext)
{
- node.reply(from, replyContext, node.local(scope()).map(instance -> {
+ node.reply(from, replyContext, node.mapReduceLocal(scope(), instance -> {
// note: this diverges from the paper, in that instead of waiting for JoinShard,
// we PreAccept to both old and new topologies and require quorums in both.
// This necessitates sending to ALL replicas of old topology, not only electorate (as fast path may be unreachable).
if (!command.witness(txn))
return PreAcceptNack.INSTANCE;
return new PreAcceptOk(txnId, command.executeAt(), calculateDeps(instance, txnId, txn, txnId));
- }).reduce((r1, r2) -> {
+ }, (r1, r2) -> {
if (!r1.isOK()) return r1;
if (!r2.isOK()) return r2;
PreAcceptOk ok1 = (PreAcceptOk) r1;
if (ok1 != okMax && !ok1.deps.isEmpty()) okMax.deps.addAll(ok1.deps);
if (ok2 != okMax && !ok2.deps.isEmpty()) okMax.deps.addAll(ok2.deps);
return okMax;
- }).orElseThrow());
+ }));
}
@Override
synchronized void setup(TxnId txnId, Txn txn, Scope scope)
{
// TODO: simple hash set supporting concurrent modification, or else avoid concurrent modification
- waitingOn = node.local(scope).collect(Collectors.toCollection(() -> new DeterministicIdentitySet<>()));
+ waitingOn = node.collectLocal(scope, DeterministicIdentitySet::new);
// FIXME: fix/check thread safety
CommandStore.onEach(waitingOn, instance -> {
Command command = instance.command(txnId);
import accord.txn.Keys;
import accord.txn.Txn;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
import java.util.Objects;
public abstract class TxnRequest implements Request
*/
public static class Scope
{
- public static class KeysForEpoch
- {
- public final long epoch;
- public final Keys keys;
-
- public KeysForEpoch(long epoch, Keys keys)
- {
- this.epoch = epoch;
- this.keys = keys;
- }
-
- static KeysForEpoch forTopology(Topology topology, Node.Id node, Keys keys)
- {
- KeyRanges topologyRanges = topology.rangesForNode(node);
- if (topologyRanges == null)
- return null;
- topologyRanges = topologyRanges.intersection(keys);
- Keys scopeKeys = keys.intersection(topologyRanges);
- return !topologyRanges.isEmpty() ? new KeysForEpoch(topology.epoch(), scopeKeys) : null;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- KeysForEpoch that = (KeysForEpoch) o;
- return epoch == that.epoch && keys.equals(that.keys);
- }
-
- @Override
- public int hashCode()
- {
- return Objects.hash(epoch, keys);
- }
-
- @Override
- public String toString()
- {
- return "EpochRanges{" +
- "epoch=" + epoch +
- ", keys=" + keys +
- '}';
- }
- }
-
- private final long maxEpoch;
- private final KeysForEpoch[] epochs;
-
- public Scope(long maxEpoch, KeysForEpoch... epochKeys)
- {
- this.maxEpoch = maxEpoch;
- this.epochs = epochKeys;
- }
-
- public int size()
- {
- return epochs.length;
- }
+ private final long minRequiredEpoch;
+ private final Keys keys;
- public KeysForEpoch get(int i)
+ public Scope(long minRequiredEpoch, Keys keys)
{
- return epochs[i];
+ this.minRequiredEpoch = minRequiredEpoch;
+ this.keys = keys;
}
- public static Scope forTopologies(Node.Id node, Topologies topologies, Keys keys)
+ public static Scope forTopologies(Node.Id node, Topologies topologies, Keys txnKeys)
{
- List<KeysForEpoch> ranges = new ArrayList<>(topologies.size());
+ long minEpoch = 0;
+ Keys scopeKeys = Keys.EMPTY;
+ Keys lastKeys = null;
for (int i=topologies.size() - 1; i>=0; i--)
{
Topology topology = topologies.get(i);
- KeysForEpoch keysForEpoch = KeysForEpoch.forTopology(topology, node, keys);
- if (keysForEpoch != null)
+ KeyRanges topologyRanges = topology.rangesForNode(node);
+ if (topologyRanges == null)
+ continue;
+ topologyRanges = topologyRanges.intersection(txnKeys);
+ Keys epochKeys = txnKeys.intersection(topologyRanges);
+ if (lastKeys == null || !lastKeys.containsAll(epochKeys))
{
- ranges.add(keysForEpoch);
+ minEpoch = topology.epoch();
+ scopeKeys = scopeKeys.merge(epochKeys);
}
+ lastKeys = epochKeys;
}
- return new Scope(topologies.currentEpoch(), ranges.toArray(KeysForEpoch[]::new));
+ return new Scope(minEpoch, scopeKeys);
}
public static Scope forTopologies(Node.Id node, Topologies topologies, Txn txn)
return forTopologies(node, topologies, txn.keys());
}
- public long maxEpoch()
- {
- return maxEpoch;
- }
-
- public boolean intersects(KeyRanges ranges)
+ public long minRequiredEpoch()
{
- for (KeysForEpoch keysForEpoch : this.epochs)
- {
- if (ranges.intersects(keysForEpoch.keys))
- return true;
- }
-
- return false;
+ return minRequiredEpoch;
}
public Keys keys()
{
- Keys keys = epochs[0].keys;
- for (int i = 1; i< epochs.length; i++)
- keys = keys.merge(epochs[i].keys);
return keys;
}
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
- Scope that = (Scope) o;
- return maxEpoch == that.maxEpoch && Arrays.equals(epochs, that.epochs);
+ Scope scope = (Scope) o;
+ return minRequiredEpoch == scope.minRequiredEpoch && keys.equals(scope.keys);
}
@Override
public int hashCode()
{
- int result = Objects.hash(maxEpoch);
- result = 31 * result + Arrays.hashCode(epochs);
- return result;
+ return Objects.hash(minRequiredEpoch, keys);
}
@Override
public String toString()
{
- return "TxnRequestScope{" +
- "epochs=" + Arrays.toString(epochs) +
+ return "Scope{" +
+ "maxEpoch=" + minRequiredEpoch +
+ ", keys=" + keys +
'}';
}
}
package accord.messages;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
import accord.local.*;
import accord.local.Node.Id;
node.reply(replyToNode, replyContext, WaitOnCommitOk.INSTANCE);
}
- void process(CommandStore instance)
+ void setup(CommandStore instance)
{
Command command = instance.command(txnId);
switch (command.status())
synchronized void setup(Keys keys)
{
- List<CommandStore> instances = node.local(keys).collect(Collectors.toList());
+ List<CommandStore> instances = node.collectLocal(keys, ArrayList::new);
waitingOn.set(instances.size());
- instances.forEach(instance -> instance.processBlocking(this::process));
+ instances.forEach(instance -> instance.processBlocking(this::setup));
}
}
return rangeIndexForKey(0, ranges.length, key);
}
+ public boolean contains(Key key)
+ {
+ return rangeIndexForKey(key) >= 0;
+ }
+
public int size()
{
return ranges.length;
return supersetRangeIndexes.length < shards.length;
}
- public boolean isSubsetOf(Topology topology)
- {
- return epoch() == topology.epoch() && Arrays.equals(this.shards, topology.shards);
- }
-
public Topology withEpoch(long epoch)
{
return new Topology(epoch, shards, ranges, nodeLookup, subsetOfRanges, supersetRangeIndexes);
return forKeys(select, (i, shard) -> true);
}
- public <T> T accumulateForKeys(Keys select, IndexedBiFunction<Shard, T, T> function, T start)
+ public <T> T foldl(Keys select, IndexedBiFunction<Shard, T, T> function, T accumulator)
{
int subsetIndex = 0;
for (int i = 0 ; i < select.size() ; )
throw new IllegalArgumentException("Range not found for " + select.get(i));
int supersetIndex = supersetRangeIndexes[subsetIndex];
Shard shard = shards[supersetIndex];
- start = function.apply(subsetIndex, shard, start);
+ accumulator = function.apply(subsetIndex, shard, accumulator);
// find the first key outside this range
i = shard.range.higherKeyIndex(select, i, select.size());
}
- return start;
+ return accumulator;
}
/**
import accord.txn.Txn;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.ImmediateFuture;
import java.util.*;
import java.util.function.LongConsumer;
*/
public class TopologyManager implements ConfigurationService.Listener
{
+ private static final Future<Void> SUCCESS = ImmediateFuture.success(null);
class EpochState
{
private final Topology global;
private final Topology local;
private final QuorumTracker syncTracker;
private boolean syncComplete = false;
- private boolean prevSynced = false;
+ private boolean prevSynced;
EpochState(Topology global, boolean prevSynced)
{
this.global = global;
this.local = global.forNode(node);
this.syncTracker = new QuorumTracker(new Topologies.Singleton(global, false));
- if (prevSynced)
- markPrevSynced();
+ this.prevSynced = prevSynced;
}
void markPrevSynced()
return false;
if (syncComplete)
return true;
- Boolean result = global.accumulateForKeys(keys, (i, shard, acc) -> {
+ Boolean result = global.foldl(keys, (i, shard, acc) -> {
if (acc == Boolean.FALSE)
return acc;
return Boolean.valueOf(syncTracker.unsafeGet(i).hasReachedQuorum());
// until the superseding epoch has been applied
private final List<Set<Node.Id>> pendingSyncComplete;
- private Epochs(EpochState[] epochs, List<Set<Node.Id>> pendingSyncComplete)
+ // list of promises to be completed as newer epochs become active. This is to support processes that
+ // are waiting on future epochs to begin (ie: txn requests from futures epochs). Index 0 is for
+ // currentEpoch + 1
+ private final List<AsyncPromise<Void>> futureEpochFutures;
+
+ private Epochs(EpochState[] epochs, List<Set<Node.Id>> pendingSyncComplete, List<AsyncPromise<Void>> futureEpochFutures)
{
this.currentEpoch = epochs.length > 0 ? epochs[0].epoch() : 0;
this.pendingSyncComplete = pendingSyncComplete;
+ this.futureEpochFutures = futureEpochFutures;
for (int i=1; i<epochs.length; i++)
Preconditions.checkArgument(epochs[i].epoch() == epochs[i-1].epoch() - 1);
this.epochs = epochs;
private Epochs(EpochState[] epochs)
{
- this(epochs, new ArrayList<>());
+ this(epochs, new ArrayList<>(), new ArrayList<>());
+ }
+
+ public Future<Void> awaitEpoch(long epoch)
+ {
+ if (epoch <= currentEpoch)
+ return SUCCESS;
+
+ int diff = (int) (epoch - currentEpoch);
+ while (futureEpochFutures.size() < diff)
+ futureEpochFutures.add(new AsyncPromise<>());
+
+ return futureEpochFutures.get(diff - 1);
}
public long nextEpoch()
return epochs.length > 0 ? epochs[0].global : Topology.EMPTY;
}
- public Epochs add(Topology topology)
- {
- Preconditions.checkArgument(topology.epoch == nextEpoch());
- EpochState[] nextEpochs = new EpochState[epochs.length + 1];
- List<Set<Node.Id>> pendingSync = new ArrayList<>(pendingSyncComplete);
- if (!pendingSync.isEmpty())
- {
- EpochState current = epochs[0];
- if (epochs.length <= 1 || epochs[1].syncComplete())
- current.markPrevSynced();
- pendingSync.remove(0).forEach(current::recordSyncComplete);
- }
- System.arraycopy(epochs, 0, nextEpochs, 1, epochs.length);
-
- boolean prevSynced = epochs.length == 0 || epochs[0].syncComplete();
- nextEpochs[0] = new EpochState(topology, prevSynced);
- return new Epochs(nextEpochs, pendingSync);
- }
-
/**
* Mark sync complete for the given node/epoch, and if this epoch
* is now synced, update the prevSynced flag on superseding epochs
long maxUnknownEpoch(TxnRequest.Scope scope)
{
- EpochState lastState = null;
- for (int i=0, mi=scope.size(); i<mi; i++)
- {
- TxnRequest.Scope.KeysForEpoch requestRanges = scope.get(i);
- EpochState epochState = get(requestRanges.epoch);
-
- if (epochState != null)
- {
- lastState = epochState;
- }
- else if (lastState != null && lastState.local.ranges().intersects(requestRanges.keys))
- {
- // we don't have the most recent epoch, but still replicate the requested ranges
- continue;
- }
- else
- {
- // we don't have the most recent epoch, and we don't replicate the requested ranges
- return scope.maxEpoch();
- }
-
- // validate requested ranges
- KeyRanges localRanges = epochState.local.ranges();
- if (!localRanges.intersects(requestRanges.keys))
- throw new RuntimeException("Received request for ranges not replicated by this node");
- }
- if (scope.maxEpoch() > 0)
- epochReporter.accept(scope.maxEpoch());
-
+ if (currentEpoch < scope.minRequiredEpoch())
+ return scope.minRequiredEpoch();
return 0;
}
@Override
public synchronized void onTopologyUpdate(Topology topology)
{
- epochs = epochs.add(topology);
+ Epochs current = epochs;
+
+ Preconditions.checkArgument(topology.epoch == current.nextEpoch());
+ EpochState[] nextEpochs = new EpochState[current.epochs.length + 1];
+ List<Set<Node.Id>> pendingSync = new ArrayList<>(current.pendingSyncComplete);
+ if (!pendingSync.isEmpty())
+ {
+ EpochState currentEpoch = current.epochs[0];
+ if (current.epochs.length <= 1 || current.epochs[1].syncComplete())
+ currentEpoch.markPrevSynced();
+ pendingSync.remove(0).forEach(currentEpoch::recordSyncComplete);
+ }
+ System.arraycopy(current.epochs, 0, nextEpochs, 1, current.epochs.length);
+
+ boolean prevSynced = current.epochs.length == 0 || current.epochs[0].syncComplete();
+ nextEpochs[0] = new EpochState(topology, prevSynced);
+
+ List<AsyncPromise<Void>> futureEpochFutures = new ArrayList<>(current.futureEpochFutures);
+ AsyncPromise<Void> toComplete = !futureEpochFutures.isEmpty() ? futureEpochFutures.remove(0) : null;
+ epochs = new Epochs(nextEpochs, pendingSync, futureEpochFutures);
+ if (toComplete != null)
+ toComplete.trySuccess(null);
+ }
+
+ public synchronized Future<Void> awaitEpoch(long epoch)
+ {
+ return epochs.awaitEpoch(epoch);
}
@Override
import accord.api.Key;
import accord.api.KeyRange;
import accord.topology.KeyRanges;
-import com.google.common.base.Preconditions;
@SuppressWarnings("rawtypes")
public class Keys implements Iterable<Key>
return new Keys(selection);
}
+ /**
+ * return true if this keys collection contains all keys found in the given keys
+ */
+ public boolean containsAll(Keys that)
+ {
+ if (isEmpty())
+ return that.isEmpty();
+
+ for (int thisIdx=0, thatIdx=0, thisSize=size(), thatSize=that.size();
+ thatIdx<thatSize;
+ thisIdx++, thatIdx++)
+ {
+ if (thisIdx >= thisSize)
+ return false;
+
+ Key thatKey = that.keys[thatIdx];
+ Key thisKey = this.keys[thisIdx];
+ int cmp = thisKey.compareTo(thatKey);
+
+ if (cmp == 0)
+ continue;
+
+ // if this key is greater that that key, we can't contain that key
+ if (cmp > 0)
+ return false;
+
+ // if search returns a positive index, a match was found and
+ // no further comparison is needed
+ thisIdx = Arrays.binarySearch(keys, thisIdx, thisSize, thatKey);
+ if (thisIdx < 0)
+ return false;
+ }
+
+ return true;
+ }
+
public Keys merge(Keys that)
{
int thisIdx = 0;
return result != null ? new Keys(result) : EMPTY;
}
- public interface KeyAccumulator<V>
+ public interface KeyFold<V>
{
- V accumulate(Key key, V value);
- default boolean isDone()
- {
- return false;
- }
+ V fold(Key key, V value);
}
/**
* Count the number of keys matching the predicate and intersecting with the given ranges.
* If terminateAfter is greater than 0, the method will return once terminateAfter matches are encountered
*/
- public <V> V accumulate(KeyRanges ranges, KeyAccumulator<V> accumulator, V value)
+ public <V> V foldl(KeyRanges ranges, KeyFold<V> fold, V accumulator)
{
int keyLB = 0;
int keyHB = size();
for (;rangeLB<rangeHB && keyLB<keyHB;)
{
Key key = keys[keyLB];
- rangeLB = ranges.rangeIndexForKey(rangeLB, ranges.size(), key);
+ rangeLB = ranges.rangeIndexForKey(rangeLB, rangeHB, key);
if (rangeLB < 0)
{
int highKey = range.higherKeyIndex(this, keyLB, keyHB);
for (int i=keyLB; i<highKey; i++)
- {
- value = accumulator.accumulate(keys[i], value);
- if (accumulator.isDone())
- return value;
- }
+ accumulator = fold.fold(keys[i], accumulator);
keyLB = highKey;
rangeLB++;
keyLB = -1 - keyLB;
}
- return value;
+ return accumulator;
}
- public <V> V accumulate(KeyRanges ranges, KeyAccumulator<V> accumulator)
+ public boolean any(KeyRanges ranges, Predicate<Key> predicate)
{
- return accumulate(ranges, accumulator, null);
+ return 1 == foldl(ranges, (key, i1, i2) -> predicate.test(key) ? 1 : 0, 0, 0, 1);
}
- private static class TerminatingKeyAccumulator<V> implements KeyAccumulator<V>
+ public interface FoldKeysToLong
{
- private boolean isDone = false;
- private final Predicate<Key> predicate;
+ long apply(Key key, long param, long prev);
+ }
- public TerminatingKeyAccumulator(Predicate<Key> predicate)
- {
- this.predicate = predicate;
- }
+ public long foldl(KeyRanges ranges, FoldKeysToLong fold, long param, long initialValue, long terminalValue)
+ {
+ int keyLB = 0;
+ int keyHB = size();
+ int rangeLB = 0;
+ int rangeHB = ranges.rangeIndexForKey(keys[keyHB-1]);
+ rangeHB = rangeHB < 0 ? -1 - rangeHB : rangeHB + 1;
- @Override
- final public V accumulate(Key key, V value)
+ for (;rangeLB<rangeHB && keyLB<keyHB;)
{
- Preconditions.checkState(!isDone);
- isDone = predicate.test(key);
- return value;
- }
+ Key key = keys[keyLB];
+ rangeLB = ranges.rangeIndexForKey(rangeLB, ranges.size(), key);
- @Override
- final public boolean isDone()
- {
- return isDone;
+ if (rangeLB < 0)
+ {
+ rangeLB = -1 -rangeLB;
+ if (rangeLB >= rangeHB)
+ break;
+ keyLB = ranges.get(rangeLB).lowKeyIndex(this, keyLB, keyHB);
+ }
+ else
+ {
+ KeyRange<?> range = ranges.get(rangeLB);
+ int highKey = range.higherKeyIndex(this, keyLB, keyHB);
+
+ for (int i=keyLB; i<highKey; i++)
+ {
+ initialValue = fold.apply(keys[i], param, initialValue);
+ if (terminalValue == initialValue)
+ return initialValue;
+ }
+
+ keyLB = highKey;
+ rangeLB++;
+ }
+
+ if (keyLB < 0)
+ keyLB = -1 - keyLB;
}
- }
- public boolean any(KeyRanges ranges, Predicate<Key> predicate)
- {
- TerminatingKeyAccumulator<Void> accumulator = new TerminatingKeyAccumulator<>(predicate);
- accumulate(ranges, accumulator, null);
- return accumulator.isDone();
+ return initialValue;
}
}
import accord.api.*;
import accord.local.*;
-import accord.topology.KeyRanges;
public class Txn
{
public Data read(Command command, Keys keyScope)
{
- return keyScope.accumulate(command.commandStore.ranges(), (key, accumulate) -> {
+ return keyScope.foldl(command.commandStore.ranges(), (key, accumulate) -> {
CommandStore commandStore = command.commandStore;
if (!commandStore.hashIntersects(key))
return accumulate;
Data result = read.read(key, command.executeAt(), commandStore.store());
return accumulate != null ? accumulate.merge(result) : result;
- });
+ }, null);
}
public Timestamp maxConflict(CommandStore commandStore)
if (write == null)
return;
- keys.accumulate(commandStore.ranges(), (key, accumulate) -> {
+ keys.foldl(commandStore.ranges(), (key, accumulate) -> {
if (commandStore.hashIntersects(key))
write.apply(key, executeAt, commandStore.store());
return accumulate;
- });
+ }, null);
}
@Override
}
// TODO: an identity hash map that doesn't mind concurrent modification / iteration
- final IdentityHashMap<T, Entry<T>> lookup = new IdentityHashMap<>();
+ final IdentityHashMap<T, Entry<T>> lookup;
final Entry<T> head = new Entry<T>(null);
public DeterministicIdentitySet()
+ {
+ this(0);
+ }
+
+ public DeterministicIdentitySet(int size)
{
head.prev = head.next = head;
+ lookup = new IdentityHashMap<>(size);
}
@Override
import java.util.concurrent.TimeUnit;
import accord.api.Scheduler;
+import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
catch (InterruptedException e)
{
- throw new IllegalStateException(e);
+ throw new UncheckedInterruptedException(e);
}
}
}
package accord;
+import java.util.ArrayList;
+import java.util.List;
+
+import accord.api.Key;
import accord.api.KeyRange;
import accord.impl.IntKey;
import accord.topology.KeyRanges;
+import accord.txn.Keys;
+
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import static accord.impl.IntKey.keys;
{
return new KeyRanges(ranges);
}
+
@Test
void intersectionTest()
{
assertEquals(keys(0, 1, 2, 3, 4),
keys(0, 2, 4).merge(keys(1, 3)));
}
+
+ @Test
+ void foldlTest()
+ {
+ List<Key> keys = new ArrayList<>();
+ long result = keys(150, 250, 350, 450, 550).foldl(ranges(r(200, 400)), (key, p, v) -> { keys.add(key); return v * p + 1; }, 15, 0, -1);
+ assertEquals(16, result);
+ assertEquals(keys(250, 350), new Keys(keys));
+
+ keys.clear();
+ result = keys(150, 250, 350, 450, 550).foldl(ranges(r(0, 500)), (key, p, v) -> { keys.add(key); return v * p + 1; }, 15, 0, -1);
+ assertEquals(3616, result);
+ assertEquals(keys(150, 250, 350, 450), new Keys(keys));
+
+ keys.clear();
+ result = keys(150, 250, 350, 450, 550).foldl(ranges(r(500, 1000)), (key, p, v) -> { keys.add(key); return v * p + 1; }, 15, 0, -1);
+ assertEquals(1, result);
+ assertEquals(keys(550), new Keys(keys));
+
+ keys.clear();
+ result = keys(150, 250, 350, 450, 550).foldl(ranges(r(0, 20), r(100, 140), r(149, 151), r(560, 2000)), (key, p, v) -> { keys.add(key); return v * p + 1; }, 15, 0, -1);
+ assertEquals(1, result);
+ assertEquals(keys(150), new Keys(keys));
+ }
+
+ @Test
+ void containsAll()
+ {
+ Keys keys = keys(150, 200, 250, 300, 350);
+ Assertions.assertTrue(keys.containsAll(keys(150, 200)));
+ Assertions.assertTrue(keys.containsAll(keys(150, 250)));
+ Assertions.assertTrue(keys.containsAll(keys(200, 250)));
+ Assertions.assertTrue(keys.containsAll(keys(200, 300)));
+ Assertions.assertTrue(keys.containsAll(keys(250, 300)));
+ Assertions.assertTrue(keys.containsAll(keys(250, 350)));
+
+ Assertions.assertFalse(keys.containsAll(keys(100, 150)));
+ Assertions.assertFalse(keys.containsAll(keys(100, 250)));
+ Assertions.assertFalse(keys.containsAll(keys(200, 225)));
+ Assertions.assertFalse(keys.containsAll(keys(225, 300)));
+ Assertions.assertFalse(keys.containsAll(keys(250, 235)));
+ Assertions.assertFalse(keys.containsAll(keys(250, 400)));
+
+ }
}
}
catch (Throwable t)
{
- logger.error("Exception running burn test:", t);
+ logger.error("Exception running burn test for seed {}:", seed, t);
throw t;
}
} while (overrideSeed == null);
import accord.local.Node;
import accord.messages.*;
import accord.topology.Topology;
+import com.google.common.base.Preconditions;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Future;
-import org.apache.cassandra.utils.concurrent.ImmediateFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.function.Function;
import java.util.function.Supplier;
-// TODO: merge with MockConfigurationService?
public class BurnTestConfigurationService implements TestableConfigurationService
{
private static final Logger logger = LoggerFactory.getLogger(BurnTestConfigurationService.class);
- private static final Future<Void> SUCCESS = ImmediateFuture.success(null);
private final Node.Id node;
private final MessageSink messageSink;
private final Function<Node.Id, Node> lookup;
private final Supplier<Random> randomSupplier;
- private final List<Topology> epochs = new ArrayList<>();
+ private final Map<Long, FetchTopology> pendingEpochs = new HashMap<>();
+
+ private final EpochHistory epochs = new EpochHistory();
private final List<ConfigurationService.Listener> listeners = new ArrayList<>();
+ private static class EpochState
+ {
+ private final long epoch;
+ private final AsyncPromise<Topology> received = new AsyncPromise<>();
+ private final AsyncPromise<Void> acknowledged = new AsyncPromise<>();
+ private final AsyncPromise<Void> synced = new AsyncPromise<>();
+
+ private Topology topology = null;
+
+ public EpochState(long epoch)
+ {
+ this.epoch = epoch;
+ }
+ }
+
+ private static class EpochHistory
+ {
+ // TODO: move pendingEpochs / FetchTopology into here?
+ private final List<EpochState> epochs = new ArrayList<>();
+
+ private long lastReceived = 0;
+ private long lastAcknowledged = 0;
+ private long lastSyncd = 0;
+
+ private EpochState get(long epoch)
+ {
+ for (long addEpoch = epochs.size() - 1; addEpoch <= epoch; addEpoch++)
+ epochs.add(new EpochState(addEpoch));
+ return epochs.get((int) epoch);
+ }
+
+ EpochHistory receive(Topology topology)
+ {
+ long epoch = topology.epoch();
+ Preconditions.checkState(epoch == 0 || lastReceived == epoch - 1);
+ lastReceived = epoch;
+ EpochState state = get(epoch);
+ state.topology = topology;
+ state.received.setSuccess(topology);
+ return this;
+ }
+
+ Future<Topology> receiveFuture(long epoch)
+ {
+ return get(epoch).received;
+ }
+
+ Topology topologyFor(long epoch)
+ {
+ return get(epoch).topology;
+ }
+
+ EpochHistory acknowledge(long epoch)
+ {
+ Preconditions.checkState(epoch == 0 || lastAcknowledged == epoch - 1);
+ lastAcknowledged = epoch;
+ get(epoch).acknowledged.setSuccess(null);
+ return this;
+ }
+
+ Future<Void> acknowledgeFuture(long epoch)
+ {
+ return get(epoch).acknowledged;
+ }
+
+ EpochHistory syncComplete(long epoch)
+ {
+ Preconditions.checkState(epoch == 0 || lastSyncd == epoch - 1);
+ EpochState state = get(epoch);
+ Preconditions.checkState(state.received.isDone());
+ Preconditions.checkState(state.acknowledged.isDone());
+ lastSyncd = epoch;
+ get(epoch).synced.setSuccess(null);
+ return this;
+ }
+ }
+
public BurnTestConfigurationService(Node.Id node, MessageSink messageSink, Supplier<Random> randomSupplier, Topology topology, Function<Node.Id, Node> lookup)
{
this.node = node;
this.messageSink = messageSink;
this.randomSupplier = randomSupplier;
this.lookup = lookup;
- epochs.add(Topology.EMPTY);
- epochs.add(topology);
+ epochs.receive(Topology.EMPTY).acknowledge(0).syncComplete(0);
+ epochs.receive(topology).acknowledge(1).syncComplete(1);
}
@Override
@Override
public synchronized Topology currentTopology()
{
- return epochs.get(epochs.size() - 1);
+ return epochs.topologyFor(epochs.lastReceived);
}
@Override
public synchronized Topology getTopologyForEpoch(long epoch)
{
- return epoch >= epochs.size() ? null : epochs.get((int) epoch);
+ return epochs.topologyFor(epoch);
}
private static class FetchTopologyRequest implements Request
private final FetchTopologyRequest request;
private final List<Node.Id> candidates;
- private final Set<Runnable> onComplete = new HashSet<>();
-
public FetchTopology(long epoch)
{
this.request = new FetchTopologyRequest(epoch);
}
}
- private final Map<Long, FetchTopology> pendingEpochs = new HashMap<>();
-
@Override
- public synchronized Future<Void> fetchTopologyForEpoch(long epoch)
+ public synchronized void fetchTopologyForEpoch(long epoch)
{
- if (epoch < epochs.size())
- {
- return SUCCESS;
- }
+ if (epoch <= epochs.lastReceived)
+ return;
- FetchTopology fetch = pendingEpochs.computeIfAbsent(epoch, FetchTopology::new);
- return fetch;
+ pendingEpochs.computeIfAbsent(epoch, FetchTopology::new);
}
@Override
- public void acknowledgeEpoch(long epoch)
+ public synchronized void acknowledgeEpoch(long epoch)
{
+ epochs.acknowledge(epoch);
Topology topology = getTopologyForEpoch(epoch);
Node originator = lookup.apply(node);
TopologyUpdate.syncEpoch(originator, epoch - 1, topology.nodes());
@Override
public synchronized void reportTopology(Topology topology)
{
- if (topology.epoch() < epochs.size())
+ long lastReceived = epochs.lastReceived;
+ if (topology.epoch() <= lastReceived)
return;
- if (topology.epoch() > epochs.size())
+ if (topology.epoch() > lastReceived + 1)
{
- fetchTopologyForEpoch(epochs.size() + 1).addListener(() -> reportTopology(topology));
+ fetchTopologyForEpoch(lastReceived + 1);
+ epochs.receiveFuture(lastReceived + 1).addListener(() -> reportTopology(topology));
return;
}
- logger.trace("Epoch {} received by {}", topology.epoch(), node);
- epochs.add(topology);
+ long lastAcked = epochs.lastAcknowledged;
+ if (topology.epoch() > lastAcked + 1)
+ {
+ epochs.acknowledgeFuture(lastAcked + 1).addListener(() -> reportTopology(topology));
+ return;
+ }
+ logger.trace("Epoch {} received by {}", topology.epoch(), node);
+ epochs.receive(topology);
for (Listener listener : listeners)
listener.onTopologyUpdate(topology);
fetch.setSuccess(null);
}
-
}
{
private static final Logger logger = LoggerFactory.getLogger(TopologyUpdate.class);
- private static class CountDownFuture<T> extends AsyncPromise<T>
- {
- private final AtomicInteger remaining;
- private final T value;
-
- public CountDownFuture(int count, T value)
- {
- this.remaining = new AtomicInteger(count);
- this.value = value;
- }
-
- public CountDownFuture(int count)
- {
- this(count, null);
- }
-
- public void countDown()
- {
- if (remaining.decrementAndGet() == 0)
- setSuccess(value);
- }
- }
+ private static final Set<Long> pendingTopologies = Sets.newConcurrentHashSet();
private static class CommandSync
{
}
public void process(Node node)
{
- node.local(txn.keys()).forEach(commandStore -> {
+ node.forEachLocal(txn, commandStore -> {
switch (status)
{
case PreAccepted:
return stage.addCallback(dieOnException());
}
- private static <T> Future<Void> map(Collection<T> items, Function<T, Future<Void>> function)
- {
- CountDownFuture<Void> latch = new CountDownFuture<>(items.size());
- for (T item : items)
- {
- function.apply(item).addListener(latch::countDown);
- }
- return latch;
- }
-
public static MessageTask notify(Node originator, Collection<Node.Id> cluster, Topology update)
{
+ pendingTopologies.add(update.epoch());
return MessageTask.begin(originator, cluster, "TopologyNotify:" + update.epoch(), (node, from) -> {
long nodeEpoch = node.topology().epoch();
if (nodeEpoch + 1 < update.epoch())
});
}
- private static Future<Void> broadcast(List<Node.Id> cluster, Function<Node.Id, Node> lookup, String desc, BiConsumer<Node, Node.Id> process)
- {
- return map(cluster, node -> MessageTask.apply(lookup.apply(node), cluster, desc, process));
- }
-
private static Collection<Node.Id> allNodesFor(Txn txn, Topology... topologies)
{
Set<Node.Id> result = new HashSet<>();
Map<TxnId, CommandSync> syncMessages = new ConcurrentHashMap<>();
Consumer<Command> commandConsumer = command -> syncMessages.put(command.txnId(), new CommandSync(command));
if (committedOnly)
- node.local().forEach(commandStore -> commandStore.forCommittedInEpoch(ranges, epoch, commandConsumer));
+ node.forEachLocal(commandStore -> commandStore.forCommittedInEpoch(ranges, epoch, commandConsumer));
else
- node.local().forEach(commandStore -> commandStore.forEpochCommands(ranges, epoch, commandConsumer));
+ node.forEachLocal(commandStore -> commandStore.forEpochCommands(ranges, epoch, commandConsumer));
return syncMessages.values().stream().map(cmd -> MessageTask.of(node, recipients.apply(cmd), "Sync:" + cmd.txnId + ':' + epoch + ':' + forEpoch, cmd::process));
}
continue;
Set<Node.Id> newNodes = Sets.difference(nextShard.nodeSet, syncShard.nodeSet);
+
+ if (newNodes.isEmpty())
+ continue;
+
KeyRanges ranges = KeyRanges.singleton(intersection);
for (long epoch=1; epoch<syncEpoch; epoch++)
messageStream = Stream.concat(messageStream, syncEpochCommands(node,
return dieExceptionally(last);
}
- public static void update(Node originator, Topology update, List<Node.Id> cluster, Function<Node.Id, Node> lookup)
+ public static Future<Void> syncEpoch(Node originator, long epoch, Collection<Node.Id> cluster)
{
- long epoch = update.epoch();
- // notify
- dieExceptionally(notify(originator, cluster, update)
- // sync operations
- .flatMap(v -> map(cluster, node -> sync(lookup.apply(node), epoch - 1)))
- // inform sync complete
- .flatMap(v -> broadcast(cluster, lookup, "SyncComplete:" + epoch, (node, from) -> node.onEpochSyncComplete(from, epoch))));
+ Future<Void> future = dieExceptionally(sync(originator, epoch)
+ .flatMap(v -> MessageTask.apply(originator, cluster, "SyncComplete:" + epoch, (node, from) -> node.onEpochSyncComplete(originator.id(), epoch))));
+ future.addCallback((unused, throwable) -> pendingTopologies.remove(epoch));
+ return future;
}
- public static Future<Void> syncEpoch(Node originator, long epoch, Collection<Node.Id> cluster)
+ public static int pendingTopologies()
{
- return dieExceptionally(sync(originator, epoch)
- .flatMap(v -> MessageTask.apply(originator, cluster, "SyncComplete:" + epoch, (node, from) -> node.onEpochSyncComplete(originator.id(), epoch))));
+ return pendingTopologies.size();
}
}
{
private static CommandStore getCommandShard(Node node, Key key)
{
- return node.local(key).orElseThrow();
+ return node.unsafeForKey(key);
}
private static Command getCommand(Node node, Key key, TxnId txnId)
TxnId txnId1 = node1.nextTxnId();
Txn txn1 = writeTxn(keys);
node1.coordinate(txnId1, txn1).get();
- node1.local(keys).forEach(commands -> {
+ node1.forEachLocal(keys, commands -> {
Command command = commands.command(txnId1);
Assertions.assertTrue(command.savedDeps().isEmpty());
});
// new nodes should have the previous epochs operation as a dependency
cluster.nodes(4, 5, 6).forEach(node -> {
- node.local(keys).forEach(commands -> {
+ node.forEachLocal(keys, commands -> {
Command command = commands.command(txnId2);
Assertions.assertTrue(command.savedDeps().contains(txnId1));
});
});
- // old nodes should be aware of the new epoch
- cluster.configServices(1, 2, 3).forEach(config -> {
- Assertions.assertEquals(2, config.currentEpoch());
- });
-
// ...and participated in consensus
cluster.nodes(1, 2, 3).forEach(node -> {
- node.local(keys).forEach(commands -> {
+ node.forEachLocal(keys, commands -> {
Command command = commands.command(txnId2);
Assertions.assertTrue(command.hasBeen(Status.Committed));
});
RecordingMessageSink messageSink = (RecordingMessageSink) node1.messageSink();
messageSink.clearHistory();
TxnId txnId1 = coordinate(node1, keys);
- node1.local(keys).forEach(commands -> {
+ node1.forEachLocal(keys, commands -> {
Command command = commands.command(txnId1);
Assertions.assertTrue(command.savedDeps().isEmpty());
});
}).collect(Collectors.toSet());
Assertions.assertEquals(idSet(1, 2, 3), accepts);
- node1.local(keys).forEach(commands -> {
+ node1.forEachLocal(keys, commands -> {
Command command = commands.command(txnId2);
Assertions.assertTrue(command.hasBeen(Status.Committed));
Assertions.assertTrue(command.savedDeps().contains(txnId1));
messageSink.clearHistory();
TxnId txnId3 = coordinate(node1, keys);
Assertions.assertFalse(messageSink.requests.stream().anyMatch(env -> env.payload instanceof Accept));
- node1.local(keys).forEach(commands -> {
+ node1.forEachLocal(keys, commands -> {
Command command = commands.command(txnId3);
Assertions.assertTrue(command.hasBeen(Status.Committed));
Assertions.assertTrue(command.savedDeps().contains(txnId1));
import accord.api.MessageSink;
import accord.burn.BurnTestConfigurationService;
-import accord.local.CommandStore;
+import accord.local.CommandStores;
import accord.local.Node;
import accord.local.Node.Id;
import accord.api.Scheduler;
|| !partitionSet.contains(deliver.src) && !partitionSet.contains(deliver.dst));
if (drop)
{
- logger.trace("{} DROP[{}] {}", clock++, on.epoch(), deliver);
+ logger.debug("{} DROP[{}] {}", clock++, on.epoch(), deliver);
return true;
}
- logger.trace("{} RECV[{}] {}", clock++, on.epoch(), deliver);
+ logger.debug("{} RECV[{}] {}", clock++, on.epoch(), deliver);
if (deliver.message instanceof Reply)
{
Reply reply = (Reply) deliver.message;
MessageSink messageSink = sinks.create(node, randomSupplier.get());
BurnTestConfigurationService configService = new BurnTestConfigurationService(node, messageSink, randomSupplier, topology, lookup::get);
lookup.put(node, new Node(node, messageSink, configService,
- nowSupplier.get(), () -> new ListStore(node), ListAgent.INSTANCE, sinks, CommandStore.Factory.SYNCHRONIZED));
+ nowSupplier.get(), () -> new ListStore(node), ListAgent.INSTANCE, sinks, CommandStores.Synchronized::new));
}
List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes));
package accord.impl.list;
+import java.util.Arrays;
+import java.util.Map;
import java.util.TreeMap;
+import java.util.stream.Collectors;
import accord.api.Data;
import accord.api.Key;
@Override
public Data merge(Data data)
{
- this.putAll(((ListData)data));
+ if (data != null)
+ this.putAll(((ListData)data));
return this;
}
+
+ @Override
+ public String toString()
+ {
+ return entrySet().stream()
+ .map(e -> e.getKey() + "=" + Arrays.toString(e.getValue()))
+ .collect(Collectors.joining(", ", "{", "}"));
+ }
}
package accord.impl.list;
+import java.util.Arrays;
import java.util.TreeMap;
+import java.util.stream.Collectors;
import accord.api.Key;
import accord.api.Store;
s.data.merge(key, new Timestamped<>(executeAt, data), Timestamped::merge);
logger.trace("WRITE on {} at {} key:{} -> {}", s.node, executeAt, key, data);
}
+
+ @Override
+ public String toString()
+ {
+ return entrySet().stream()
+ .map(e -> e.getKey() + "=" + Arrays.toString(e.getValue()))
+ .collect(Collectors.joining(", ", "{", "}"));
+ }
}
@Override
public void process(Node node, Node.Id from, ReplyContext replyContext)
{
- node.local().forEach(commandStore -> {
+ node.forEachLocal(commandStore -> {
Command command = commandStore.command(txnId);
command.commit(txn, deps, executeAt);
});
{
Map<TxnId, SyncMessage> syncMessages = new ConcurrentHashMap<>();
Consumer<Command> commandConsumer = command -> syncMessages.put(command.txnId(), new SyncMessage(command));
- node.local().forEach(commandStore -> commandStore.forCommittedInEpoch(syncTopology.ranges(), syncEpoch, commandConsumer));
+ node.forEachLocal(commandStore -> commandStore.forCommittedInEpoch(syncTopology.ranges(), syncEpoch, commandConsumer));
for (SyncMessage message : syncMessages.values())
CommandSync.sync(node, message, nextTopology);
import accord.api.MessageSink;
import accord.coordinate.Timeout;
import accord.impl.TopologyUtils;
-import accord.local.CommandStore;
+import accord.local.CommandStores;
import accord.local.Node;
import accord.local.Node.Id;
import accord.topology.KeyRanges;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
-import java.util.function.Consumer;
import java.util.function.LongSupplier;
import static accord.Utils.*;
() -> store,
new TestAgent(),
new ThreadPoolScheduler(),
- CommandStore.Factory.SINGLE_THREAD);
+ CommandStores.SingleThread::new);
}
private void init(Topology topology)
private final MessageSink messageSink;
private final List<Topology> epochs = new ArrayList<>();
private final List<Listener> listeners = new ArrayList<>();
- private final Map<Long, AsyncPromise<Void>> pending = new HashMap<>();
private final EpochFunction<MockConfigurationService> fetchTopologyHandler;
public MockConfigurationService(MessageSink messageSink, EpochFunction<MockConfigurationService> fetchTopologyHandler)
}
@Override
- public synchronized Future<Void> fetchTopologyForEpoch(long epoch)
+ public synchronized void fetchTopologyForEpoch(long epoch)
{
if (epoch < epochs.size())
- {
- return SUCCESS;
- }
+ return;
- Future<Void> future = pending.computeIfAbsent(epoch, e -> new AsyncPromise<>());
fetchTopologyHandler.apply(epoch, this);
- return future;
+ return;
}
@Override
for (Listener listener : listeners)
listener.onTopologyUpdate(topology);
-
- AsyncPromise<Void> promise = pending.remove(topology.epoch());
- if (promise == null)
- return;
- promise.setSuccess(null);
}
public synchronized void reportSyncComplete(Node.Id node, long epoch)
() -> store,
new TestAgent(),
scheduler,
- CommandStore.Factory.SINGLE_THREAD);
+ CommandStores.SingleThread::new);
}
private static TxnRequest.Scope scope(TxnId txnId, Txn txn)
{
- return new TxnRequest.Scope(txnId.epoch, new TxnRequest.Scope.KeysForEpoch(txnId.epoch, txn.keys()));
+ return new TxnRequest.Scope(txnId.epoch, txn.keys());
}
private static PreAccept preAccept(TxnId txnId, Txn txn)
try
{
IntKey key = IntKey.key(10);
- CommandStore commandStore = node.local(key).orElseThrow();
+ CommandStore commandStore = node.unsafeForKey(key);
Assertions.assertFalse(commandStore.hasCommandsForKey(key));
TxnId txnId = clock.idForNode(1, ID2);
try
{
IntKey key = IntKey.key(10);
- CommandStore commandStore = node.local(key).orElseThrow();
+ CommandStore commandStore = node.unsafeForKey(key);
Assertions.assertFalse(commandStore.hasCommandsForKey(key));
TxnId txnId = clock.idForNode(1, ID2);
try
{
IntKey key = IntKey.key(10);
- CommandStore commandStore = node.local(key).orElseThrow();
+ CommandStore commandStore = node.unsafeForKey(key);
configService(node).reportTopology(node.topology().current().withEpoch(2));
messageSink.clearHistory();
package accord.messages;
import accord.api.KeyRange;
-import accord.impl.IntKey;
import accord.messages.TxnRequest.Scope;
-import accord.messages.TxnRequest.Scope.KeysForEpoch;
import accord.topology.Topologies;
import accord.topology.Topology;
import accord.txn.Keys;
import static accord.Utils.*;
import static accord.Utils.idSet;
+import static accord.impl.IntKey.keys;
import static accord.impl.IntKey.range;
public class TxnRequestScopeTest
{
- private static KeysForEpoch epochRanges(long epoch, Keys keys)
+ private static Scope scope(long epoch, Keys keys)
{
- return new KeysForEpoch(epoch, keys);
- }
-
- private static KeysForEpoch epochRanges(long epoch, int... keys)
- {
- return epochRanges(epoch, IntKey.keys(keys));
- }
-
- private static Scope scope(long epoch, KeysForEpoch... epochKeys)
- {
- return new Scope(epoch, epochKeys);
+ return new Scope(epoch, keys);
}
@Test
void createDisjointScopeTest()
{
- Keys keys = IntKey.keys(150);
+ Keys keys = keys(150);
KeyRange range = range(100, 200);
Topology topology1 = topology(1, shard(range, idList(1, 2, 3), idSet(1, 2)));
Topology topology2 = topology(2, shard(range, idList(4, 5, 6), idSet(4, 5)));
topologies.add(topology2);
topologies.add(topology1);
- Assertions.assertEquals(scope(2, epochRanges(1, 150)),
+ Assertions.assertEquals(scope(1, keys(150)),
Scope.forTopologies(id(1), topologies, keys));
- Assertions.assertEquals(scope(2, epochRanges(2, 150)),
+ Assertions.assertEquals(scope(2, keys(150)),
Scope.forTopologies(id(4), topologies, keys));
}
@Test
void movingRangeTest()
{
- Keys keys = IntKey.keys(150, 250);
+ Keys keys = keys(150, 250);
KeyRange range1 = range(100, 200);
KeyRange range2 = range(200, 300);
Topology topology1 = topology(1,
Topologies.Multi topologies = new Topologies.Multi();
topologies.add(topology2);
topologies.add(topology1);
- Assertions.assertEquals(scope(2, epochRanges(1, 150), epochRanges(2, 250)),
+ Assertions.assertEquals(scope(2, keys(150, 250)),
Scope.forTopologies(id(1), topologies, keys));
- Assertions.assertEquals(scope(2, epochRanges(1, 250), epochRanges(2, 150)),
+ Assertions.assertEquals(scope(2, keys(250, 150)),
Scope.forTopologies(id(4), topologies, keys));
}
}
private final Supplier<Random> randomSupplier;
private final Function<Node.Id, Node> lookup;
private final List<Topology> epochs = new ArrayList<>();
+ private final Map<Node.Id, KeyRanges> previouslyReplicated = new HashMap<>();
public TopologyRandomizer(Supplier<Random> randomSupplier, Topology initialTopology, Function<Node.Id, Node> lookup)
{
this.lookup = lookup;
this.epochs.add(Topology.EMPTY);
this.epochs.add(initialTopology);
+ for (Node.Id node : initialTopology.nodes())
+ previouslyReplicated.put(node, initialTopology.rangesForNode(node));
}
private enum UpdateType
return shards;
}
+ private static Map<Node.Id, KeyRanges> getAdditions(Topology current, Topology next)
+ {
+ Map<Node.Id, KeyRanges> additions = new HashMap<>();
+ for (Node.Id node : next.nodes())
+ {
+ KeyRanges prev = current.rangesForNode(node);
+ if (prev == null) prev = KeyRanges.EMPTY;
+
+ KeyRanges added = next.rangesForNode(node).difference(prev);
+ if (added.isEmpty())
+ continue;
+
+ additions.put(node, added);
+ }
+ return additions;
+ }
+
+ private static boolean reassignsRanges(Topology current, Shard[] nextShards, Map<Node.Id, KeyRanges> previouslyReplicated)
+ {
+ Topology next = new Topology(current.epoch + 1, nextShards);
+ Map<Node.Id, KeyRanges> additions = getAdditions(current, next);
+
+ for (Map.Entry<Node.Id, KeyRanges> entry : additions.entrySet())
+ {
+ if (previouslyReplicated.getOrDefault(entry.getKey(), KeyRanges.EMPTY).intersects(entry.getValue()))
+ return true;
+ }
+ return false;
+ }
+
public synchronized void maybeUpdateTopology()
{
- if (randomSupplier.get().nextInt(50) != 0)
+ // if we don't limit the number of pending topology changes in flight,
+ // the topology randomizer will keep the burn test busy indefinitely
+ if (TopologyUpdate.pendingTopologies() > 5 || randomSupplier.get().nextInt(200) != 0)
return;
Random random = randomSupplier.get();
Topology nextTopology = new Topology(current.epoch + 1, shards);
+ // FIXME: remove this (and the corresponding check in CommandStores) once lower bounds are implemented.
+ // In the meantime, the logic needed to support acquiring ranges that we previously replicated is pretty
+ // convoluted without the ability to jettison epochs.
+ if (reassignsRanges(current, shards, previouslyReplicated))
+ return;
+
+ Map<Node.Id, KeyRanges> nextAdditions = getAdditions(current, nextTopology);
+ for (Map.Entry<Node.Id, KeyRanges> entry : nextAdditions.entrySet())
+ {
+ KeyRanges previous = previouslyReplicated.getOrDefault(entry.getKey(), KeyRanges.EMPTY);
+ KeyRanges added = entry.getValue();
+ KeyRanges merged = previous.merge(added).mergeTouching();
+ previouslyReplicated.put(entry.getKey(), merged);
+ }
+
logger.debug("topology update to: {} from: {}", nextTopology, current);
epochs.add(nextTopology);
List<Node.Id> recipients,
String desc, NodeProcess process)
{
+ Preconditions.checkArgument(!recipients.isEmpty());
this.originator = originator;
this.recipients = ImmutableList.copyOf(recipients);
this.desc = desc;
import java.util.function.Supplier;
import accord.coordinate.Timeout;
-import accord.local.CommandStore;
+import accord.local.CommandStores;
import accord.local.Node;
import accord.local.Node.Id;
import accord.api.MessageSink;
{
MessageSink messageSink = sinks.create(node, randomSupplier.get());
lookup.put(node, new Node(node, messageSink, new SimpleConfigService(topology),
- nowSupplier.get(), MaelstromStore::new, MaelstromAgent.INSTANCE, sinks, CommandStore.Factory.SINGLE_THREAD));
+ nowSupplier.get(), MaelstromStore::new, MaelstromAgent.INSTANCE, sinks, CommandStores.SingleThread::new));
}
List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes));
@Override
public Data merge(Data data)
{
- this.putAll(((MaelstromData)data));
+ if (data != null)
+ this.putAll(((MaelstromData)data));
return this;
}
}
public static long messageIdFor(ReplyContext replyContext)
{
+ if (replyContext instanceof Packet)
+ return ((Packet) replyContext).body.msg_id;
return ((MaelstromReplyContext) replyContext).messageId;
}
}
import accord.coordinate.Timeout;
import accord.local.CommandStore;
+import accord.local.CommandStores;
import accord.local.Node;
import accord.local.Node.Id;
import accord.api.Scheduler;
MaelstromInit init = (MaelstromInit) packet.body;
topology = topologyFactory.toTopology(init.cluster);
sink = new StdoutSink(System::currentTimeMillis, scheduler, start, init.self, out, err);
- on = new Node(init.self, sink, new SimpleConfigService(topology), System::currentTimeMillis, MaelstromStore::new, MaelstromAgent.INSTANCE, scheduler, CommandStore.Factory.SINGLE_THREAD);
+ on = new Node(init.self, sink, new SimpleConfigService(topology), System::currentTimeMillis,
+ MaelstromStore::new, MaelstromAgent.INSTANCE, scheduler, CommandStores.SingleThread::new);
err.println("Initialized node " + init.self);
err.flush();
sink.send(packet.src, new Body(Type.init_ok, Body.SENTINEL_MSG_ID, init.msg_id));
}
@Override
- public Future<Void> fetchTopologyForEpoch(long epoch)
+ public void fetchTopologyForEpoch(long epoch)
{
- return SUCCESS;
+ return;
}
@Override