Efficient Dependencies (CASSANDRA-17111) trunk
authorBenedict Elliott Smith <benedict@apache.org>
Wed, 17 Aug 2022 08:50:36 +0000 (09:50 +0100)
committerBenedict Elliott Smith <benedict@apache.org>
Wed, 17 Aug 2022 08:57:32 +0000 (09:57 +0100)
Transaction dependencies must include only the TxnId of the dependent transactions, and the key they conflict on

patch by Benedict; reviewed by David Capwell for CASSANDRA-17111

131 files changed:
accord-core/build.gradle
accord-core/src/main/java/accord/api/Agent.java
accord-core/src/main/java/accord/api/Key.java
accord-core/src/main/java/accord/api/ProgressLog.java
accord-core/src/main/java/accord/api/Query.java
accord-core/src/main/java/accord/api/Read.java
accord-core/src/main/java/accord/api/Update.java
accord-core/src/main/java/accord/api/Write.java
accord-core/src/main/java/accord/coordinate/CheckOnCommitted.java
accord-core/src/main/java/accord/coordinate/CheckOnUncommitted.java
accord-core/src/main/java/accord/coordinate/CheckShardStatus.java
accord-core/src/main/java/accord/coordinate/Coordinate.java
accord-core/src/main/java/accord/coordinate/CoordinateFailed.java
accord-core/src/main/java/accord/coordinate/Execute.java
accord-core/src/main/java/accord/coordinate/InformHomeOfTxn.java
accord-core/src/main/java/accord/coordinate/Invalidate.java
accord-core/src/main/java/accord/coordinate/MaybeRecover.java
accord-core/src/main/java/accord/coordinate/Persist.java
accord-core/src/main/java/accord/coordinate/Preempted.java
accord-core/src/main/java/accord/coordinate/Propose.java
accord-core/src/main/java/accord/coordinate/Recover.java
accord-core/src/main/java/accord/coordinate/Timeout.java
accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
accord-core/src/main/java/accord/impl/SimpleProgressLog.java
accord-core/src/main/java/accord/local/Command.java
accord-core/src/main/java/accord/local/CommandStore.java
accord-core/src/main/java/accord/local/CommandStores.java
accord-core/src/main/java/accord/local/CommandsForKey.java
accord-core/src/main/java/accord/local/Node.java
accord-core/src/main/java/accord/messages/Accept.java
accord-core/src/main/java/accord/messages/Apply.java
accord-core/src/main/java/accord/messages/BeginInvalidate.java
accord-core/src/main/java/accord/messages/BeginRecovery.java
accord-core/src/main/java/accord/messages/Callback.java
accord-core/src/main/java/accord/messages/CheckStatus.java
accord-core/src/main/java/accord/messages/Commit.java
accord-core/src/main/java/accord/messages/InformOfPersistence.java
accord-core/src/main/java/accord/messages/InformOfTxn.java
accord-core/src/main/java/accord/messages/PreAccept.java
accord-core/src/main/java/accord/messages/ReadData.java
accord-core/src/main/java/accord/messages/TxnRequest.java
accord-core/src/main/java/accord/messages/WaitOnCommit.java
accord-core/src/main/java/accord/primitives/Ballot.java [moved from accord-core/src/main/java/accord/txn/Ballot.java with 92% similarity]
accord-core/src/main/java/accord/primitives/Deps.java [new file with mode: 0644]
accord-core/src/main/java/accord/primitives/KeyRange.java [moved from accord-core/src/main/java/accord/topology/KeyRange.java with 51% similarity]
accord-core/src/main/java/accord/primitives/KeyRanges.java [new file with mode: 0644]
accord-core/src/main/java/accord/primitives/Keys.java [new file with mode: 0644]
accord-core/src/main/java/accord/primitives/Timestamp.java [moved from accord-core/src/main/java/accord/txn/Timestamp.java with 98% similarity]
accord-core/src/main/java/accord/primitives/TxnId.java [moved from accord-core/src/main/java/accord/txn/TxnId.java with 93% similarity]
accord-core/src/main/java/accord/topology/KeyRanges.java [deleted file]
accord-core/src/main/java/accord/topology/Shard.java
accord-core/src/main/java/accord/topology/Topology.java
accord-core/src/main/java/accord/topology/TopologyManager.java
accord-core/src/main/java/accord/txn/Dependencies.java [deleted file]
accord-core/src/main/java/accord/txn/Keys.java [deleted file]
accord-core/src/main/java/accord/txn/Txn.java
accord-core/src/main/java/accord/txn/Writes.java
accord-core/src/main/java/accord/utils/ArrayBuffers.java [new file with mode: 0644]
accord-core/src/main/java/accord/utils/AsymmetricComparator.java [new file with mode: 0644]
accord-core/src/main/java/accord/utils/IndexedFold.java [new file with mode: 0644]
accord-core/src/main/java/accord/utils/IndexedFoldIntersectToLong.java [new file with mode: 0644]
accord-core/src/main/java/accord/utils/IndexedFoldToLong.java [new file with mode: 0644]
accord-core/src/main/java/accord/utils/IndexedRangeFoldToLong.java [new file with mode: 0644]
accord-core/src/main/java/accord/utils/IntrusiveLinkedList.java [deleted file]
accord-core/src/main/java/accord/utils/IntrusiveLinkedListNode.java [deleted file]
accord-core/src/main/java/accord/utils/MergeIterator.java [new file with mode: 0644]
accord-core/src/main/java/accord/utils/SortedArrays.java [new file with mode: 0644]
accord-core/src/main/java/accord/utils/Timestamped.java
accord-core/src/main/java/org/apache/cassandra/concurrent/ImmediateExecutor.java
accord-core/src/test/java/accord/KeysTest.java
accord-core/src/test/java/accord/Utils.java
accord-core/src/test/java/accord/burn/BurnTest.java
accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
accord-core/src/test/java/accord/burn/TopologyUpdate.java
accord-core/src/test/java/accord/coordinate/CoordinateTest.java
accord-core/src/test/java/accord/coordinate/PreAcceptTrackerTest.java
accord-core/src/test/java/accord/coordinate/RecoverTest.java
accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
accord-core/src/test/java/accord/coordinate/tracking/QuorumTrackerTest.java
accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerTest.java
accord-core/src/test/java/accord/impl/IntHashKey.java
accord-core/src/test/java/accord/impl/IntKey.java
accord-core/src/test/java/accord/impl/TestAgent.java
accord-core/src/test/java/accord/impl/TopologyFactory.java
accord-core/src/test/java/accord/impl/TopologyUtils.java
accord-core/src/test/java/accord/impl/basic/Cluster.java
accord-core/src/test/java/accord/impl/basic/NodeSink.java
accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java
accord-core/src/test/java/accord/impl/basic/RecurringPendingRunnable.java
accord-core/src/test/java/accord/impl/list/ListAgent.java
accord-core/src/test/java/accord/impl/list/ListQuery.java
accord-core/src/test/java/accord/impl/list/ListRead.java
accord-core/src/test/java/accord/impl/list/ListRequest.java
accord-core/src/test/java/accord/impl/list/ListResult.java
accord-core/src/test/java/accord/impl/list/ListUpdate.java
accord-core/src/test/java/accord/impl/list/ListWrite.java
accord-core/src/test/java/accord/impl/mock/EpochSync.java
accord-core/src/test/java/accord/impl/mock/MockCluster.java
accord-core/src/test/java/accord/impl/mock/MockStore.java
accord-core/src/test/java/accord/local/CommandTest.java
accord-core/src/test/java/accord/local/NodeTest.java
accord-core/src/test/java/accord/messages/PreAcceptTest.java
accord-core/src/test/java/accord/messages/TxnRequestScopeTest.java
accord-core/src/test/java/accord/topology/TopologyManagerTest.java
accord-core/src/test/java/accord/topology/TopologyRandomizer.java
accord-core/src/test/java/accord/topology/TopologyTest.java
accord-core/src/test/java/accord/txn/DepsTest.java [new file with mode: 0644]
accord-core/src/test/java/accord/utils/Gen.java [new file with mode: 0644]
accord-core/src/test/java/accord/utils/GenTest.java [new file with mode: 0644]
accord-core/src/test/java/accord/utils/Gens.java [new file with mode: 0644]
accord-core/src/test/java/accord/utils/KeyRangeTest.java
accord-core/src/test/java/accord/utils/KeyRangesTest.java
accord-core/src/test/java/accord/utils/MessageTask.java
accord-core/src/test/java/accord/utils/Property.java [new file with mode: 0644]
accord-core/src/test/java/accord/utils/SortedArraysTest.java [new file with mode: 0644]
accord-core/src/test/resources/burn-logback-trace-rolling.xml [new file with mode: 0644]
accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
accord-maelstrom/src/main/java/accord/maelstrom/Datum.java
accord-maelstrom/src/main/java/accord/maelstrom/Json.java
accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java
accord-maelstrom/src/main/java/accord/maelstrom/MaelstromQuery.java
accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java
accord-maelstrom/src/main/java/accord/maelstrom/MaelstromReply.java
accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java
accord-maelstrom/src/main/java/accord/maelstrom/MaelstromResult.java
accord-maelstrom/src/main/java/accord/maelstrom/MaelstromUpdate.java
accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java
accord-maelstrom/src/main/java/accord/maelstrom/Main.java
accord-maelstrom/src/main/java/accord/maelstrom/Packet.java
accord-maelstrom/src/main/java/accord/maelstrom/TopologyFactory.java

index b48c9dce79ac1ab5a947fbb00e5111a57b9ee348..9aef59fc9f1196582d056acb6b4f88811d3d1020 100644 (file)
@@ -35,6 +35,7 @@ dependencies {
     implementation 'com.google.code.gson:gson:2.8.7'
     implementation 'ch.qos.logback:logback-classic:1.2.3'
     implementation 'io.netty:netty-all:4.1.58.Final'
+    implementation 'com.carrotsearch:hppc:0.8.1'
     testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
     testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'
 }
index bb00725520f3892d3762804bbdfafaea67946f2c..1df0b67331ff8f76b1b9d988dcc53c111c533e76 100644 (file)
@@ -2,7 +2,7 @@ package accord.api;
 
 import accord.local.Node;
 import accord.local.Command;
-import accord.txn.Timestamp;
+import accord.primitives.Timestamp;
 
 /**
  * Facility for augmenting node behaviour at specific points
index 533d50cdf508b4512f4ca261e1d3f715b5474563..662685395e67f472558733bf35910b734e486064 100644 (file)
@@ -3,10 +3,10 @@ package accord.api;
 /**
  * A routing key for determining which shards are involved in a transaction
  */
-public interface Key<K extends Key<K>> extends Comparable<K>
+public interface Key extends Comparable<Key>
 {
     /**
      * Returns a hash code of a key to support accord internal sharding. Hash values for equal keys must be equal.
      */
-    int keyHash();
+    int routingHash();
 }
index 68e5879ec2f859ba71409077029c79ab326a99e9..20da3ba2a2962c925caeaccb57b1e0d886e3e108 100644 (file)
@@ -8,8 +8,8 @@ import accord.coordinate.CheckOnUncommitted;
 import accord.coordinate.InformHomeOfTxn;
 import accord.local.CommandStore;
 import accord.local.Node.Id;
-import accord.txn.Keys;
-import accord.txn.TxnId;
+import accord.primitives.Keys;
+import accord.primitives.TxnId;
 
 /**
  * This interface is responsible for managing incomplete transactions *and retrying them*.
index 158ff1c4ed733b8a536724b63daa67d543e4a008..62a7c0990477fe276853f4bae250f7a24080e344 100644 (file)
@@ -1,5 +1,7 @@
 package accord.api;
 
+import javax.annotation.Nullable;
+
 /**
  * The computational/transformation part of a client query
  */
@@ -9,5 +11,5 @@ public interface Query
      * Perform some transformation on the complete {@link Data} result of a {@link Read}
      * from some {@link DataStore}, to produce a {@link Result} to return to the client.
      */
-    Result compute(Data data);
+    Result compute(Data data, @Nullable Read read, @Nullable Update update);
 }
index a14ce3a852c1edba54d6cfbbc85228ecec303e1d..c7380fba4394190dda5390b7e1feb74a9db1aeee 100644 (file)
@@ -1,7 +1,7 @@
 package accord.api;
 
-import accord.txn.Keys;
-import accord.txn.Timestamp;
+import accord.primitives.Keys;
+import accord.primitives.Timestamp;
 
 /**
  * A read to be performed on potentially multiple shards, the inputs of which may be fed to a {@link Query}
index d07c087587456fdb3c2c9c7e526e7ff39cfe6a56..5fc3b1a6a27427bb52b7ff110464a84fa190bac4 100644 (file)
@@ -1,5 +1,7 @@
 package accord.api;
 
+import accord.primitives.Keys;
+
 /**
  * A client-defined update operation (the write equivalent of a query).
  * Takes as input the data returned by {@code Read}, and returns a {@code Write}
@@ -7,5 +9,6 @@ package accord.api;
  */
 public interface Update
 {
+    Keys keys();
     Write apply(Data data);
 }
index 27841393985400557baa1f19d3521f9d101e735b..d32394768ba77d84b48e4acdc111f5f525191553 100644 (file)
@@ -1,6 +1,6 @@
 package accord.api;
 
-import accord.txn.Timestamp;
+import accord.primitives.Timestamp;
 
 /**
  * A collection of data to write to one or more stores
index ecef8ff913ac17090aa5d0f82717d4d0d0d81c6f..b01e6edadf5476a62a0b24a3a8e15e3c81f30bba 100644 (file)
@@ -6,9 +6,7 @@ import accord.local.Node;
 import accord.messages.CheckStatus.CheckStatusOkFull;
 import accord.messages.CheckStatus.IncludeInfo;
 import accord.topology.Shard;
-import accord.txn.Keys;
-import accord.txn.Txn;
-import accord.txn.TxnId;
+import accord.primitives.TxnId;
 
 import static accord.local.Status.Executed;
 
index dbfae90de883a8c54f7ea65edca558778ccd3fa0..a38e7d607d7ae50d97ad80ebfc688fd4039258d8 100644 (file)
@@ -5,8 +5,8 @@ import accord.local.Command;
 import accord.local.Node;
 import accord.messages.CheckStatus.CheckStatusOkFull;
 import accord.topology.Shard;
-import accord.txn.Keys;
-import accord.txn.TxnId;
+import accord.primitives.Keys;
+import accord.primitives.TxnId;
 
 import static accord.local.Status.Committed;
 
index 98d060c1329bf371733344b898f8b636d3f6a305..d0e3f568ce0722c985dc01e209632af6f00e1f38 100644 (file)
@@ -13,7 +13,7 @@ import accord.messages.CheckStatus.CheckStatusOk;
 import accord.messages.CheckStatus.CheckStatusReply;
 import accord.messages.CheckStatus.IncludeInfo;
 import accord.topology.Shard;
-import accord.txn.TxnId;
+import accord.primitives.TxnId;
 import org.apache.cassandra.utils.concurrent.AsyncFuture;
 
 /**
@@ -131,7 +131,7 @@ public abstract class CheckShardStatus<T extends CheckStatusOk> extends AsyncFut
     }
 
     @Override
-    public void onCallbackFailure(Throwable failure)
+    public void onCallbackFailure(Id from, Throwable failure)
     {
         tryFailure(failure);
     }
index 9defcbd98cac22151147b3ddcb0b411346a86b3d..1194a1a1538442bb860520b4f6a3b72e1b4aad2a 100644 (file)
@@ -11,16 +11,16 @@ import accord.api.Result;
 import accord.coordinate.tracking.FastPathTracker;
 import accord.topology.Shard;
 import accord.topology.Topologies;
-import accord.txn.Ballot;
+import accord.primitives.Ballot;
 import accord.messages.Callback;
 import accord.local.Node;
-import accord.txn.Dependencies;
+import accord.primitives.Deps;
 import accord.local.Node.Id;
-import accord.txn.Timestamp;
+import accord.primitives.Timestamp;
 import accord.messages.PreAccept;
 import accord.messages.PreAccept.PreAcceptOk;
 import accord.txn.Txn;
-import accord.txn.TxnId;
+import accord.primitives.TxnId;
 import accord.messages.PreAccept.PreAcceptReply;
 import com.google.common.collect.Sets;
 
@@ -180,7 +180,7 @@ public class Coordinate extends AsyncFuture<Result> implements Callback<PreAccep
     }
 
     @Override
-    public void onCallbackFailure(Throwable failure)
+    public void onCallbackFailure(Id from, Throwable failure)
     {
         tryFailure(failure);
     }
@@ -245,26 +245,17 @@ public class Coordinate extends AsyncFuture<Result> implements Callback<PreAccep
         if (tracker.hasMetFastPathCriteria())
         {
             preAcceptIsDone = true;
-            Dependencies deps = new Dependencies();
-            for (PreAcceptOk preAcceptOk : preAcceptOks)
-            {
-                if (preAcceptOk.witnessedAt.equals(txnId))
-                    deps.addAll(preAcceptOk.deps);
-            }
-
+            Deps deps = Deps.merge(preAcceptOks, ok -> ok.witnessedAt.equals(txnId) ? ok.deps : null);
             Execute.execute(node, txnId, txn, homeKey, txnId, deps, this);
         }
         else
         {
-            Dependencies deps = new Dependencies();
+            Deps deps = Deps.merge(preAcceptOks, ok -> ok.deps);
             Timestamp executeAt; {
-                Timestamp tmp = Timestamp.NONE;
+                Timestamp accumulate = Timestamp.NONE;
                 for (PreAcceptOk preAcceptOk : preAcceptOks)
-                {
-                    deps.addAll(preAcceptOk.deps);
-                    tmp = Timestamp.max(tmp, preAcceptOk.witnessedAt);
-                }
-                executeAt = tmp;
+                    accumulate = Timestamp.max(accumulate, preAcceptOk.witnessedAt);
+                executeAt = accumulate;
             }
 
             // TODO: perhaps don't submit Accept immediately if we almost have enough for fast-path,
index d772a1866e059605be7273cc4796fb5a03be781a..1a219dab18a94a516ad1a8687207cda90e09dcc9 100644 (file)
@@ -3,7 +3,7 @@ package accord.coordinate;
 import javax.annotation.Nullable;
 
 import accord.api.Key;
-import accord.txn.TxnId;
+import accord.primitives.TxnId;
 
 /**
  * Thrown when a transaction exceeds its specified timeout for obtaining a result for a client
index 8c376b1bf05a02fc6d09be36e98bddd06f47db94..73424fb272e4cda242fec0b1fa25c374db00edc0 100644 (file)
@@ -9,10 +9,12 @@ import accord.coordinate.tracking.ReadTracker;
 import accord.api.Result;
 import accord.messages.Callback;
 import accord.local.Node;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
 import accord.topology.Topologies;
 import accord.txn.*;
 import accord.messages.ReadData.ReadReply;
-import accord.txn.Dependencies;
+import accord.primitives.Deps;
 import accord.local.Node.Id;
 import accord.messages.Commit;
 import accord.messages.ReadData;
@@ -25,21 +27,21 @@ class Execute implements Callback<ReadReply>
     final Txn txn;
     final Key homeKey;
     final Timestamp executeAt;
-    final Dependencies dependencies;
+    final Deps deps;
     final Topologies topologies;
     final ReadTracker readTracker;
     final BiConsumer<Result, Throwable> callback;
     private Data data;
     private boolean isDone;
 
-    private Execute(Node node, TxnId txnId, Txn txn, Key homeKey, Timestamp executeAt, Dependencies dependencies, BiConsumer<Result, Throwable> callback)
+    private Execute(Node node, TxnId txnId, Txn txn, Key homeKey, Timestamp executeAt, Deps deps, BiConsumer<Result, Throwable> callback)
     {
         this.node = node;
         this.txnId = txnId;
         this.txn = txn;
         this.homeKey = homeKey;
         this.executeAt = executeAt;
-        this.dependencies = dependencies;
+        this.deps = deps;
         this.topologies = node.topology().forEpoch(txn, executeAt.epoch);
         Topologies readTopologies = node.topology().forEpoch(txn.read.keys(), executeAt.epoch);
         this.readTracker = new ReadTracker(readTopologies);
@@ -49,12 +51,12 @@ class Execute implements Callback<ReadReply>
     private void start()
     {
         Set<Id> readSet = readTracker.computeMinimalReadSetAndMarkInflight();
-        Commit.commitAndRead(node, topologies, txnId, txn, homeKey, executeAt, dependencies, readSet, this);
+        Commit.commitAndRead(node, topologies, txnId, txn, homeKey, executeAt, deps, readSet, this);
     }
 
-    public static void execute(Node node, TxnId txnId, Txn txn, Key homeKey, Timestamp executeAt, Dependencies dependencies, BiConsumer<Result, Throwable> callback)
+    public static void execute(Node node, TxnId txnId, Txn txn, Key homeKey, Timestamp executeAt, Deps deps, BiConsumer<Result, Throwable> callback)
     {
-        Execute execute = new Execute(node, txnId, txn, homeKey, executeAt, dependencies, callback);
+        Execute execute = new Execute(node, txnId, txn, homeKey, executeAt, deps, callback);
         execute.start();
     }
 
@@ -84,7 +86,7 @@ class Execute implements Callback<ReadReply>
             isDone = true;
             Result result = txn.result(data);
             callback.accept(result, null);
-            Persist.persist(node, topologies, txnId, homeKey, txn, executeAt, dependencies, txn.execute(executeAt, data), result);
+            Persist.persist(node, topologies, txnId, homeKey, txn, executeAt, deps, txn.execute(executeAt, data), result);
         }
     }
 
@@ -126,7 +128,7 @@ class Execute implements Callback<ReadReply>
     }
 
     @Override
-    public void onCallbackFailure(Throwable failure)
+    public void onCallbackFailure(Id from, Throwable failure)
     {
         isDone = true;
         callback.accept(null, failure);
index 707a6eec5aa88366c6b83f608c6492f03db471f9..59b60fe7f818ee5c3cefe014393433925ac8d282 100644 (file)
@@ -9,7 +9,7 @@ import accord.messages.InformOfTxn;
 import accord.messages.InformOfTxn.InformOfTxnReply;
 import accord.topology.Shard;
 import accord.txn.Txn;
-import accord.txn.TxnId;
+import accord.primitives.TxnId;
 import org.apache.cassandra.utils.concurrent.AsyncFuture;
 import org.apache.cassandra.utils.concurrent.Future;
 
@@ -64,7 +64,7 @@ public class InformHomeOfTxn extends AsyncFuture<Void> implements Callback<Infor
     }
 
     @Override
-    public void onCallbackFailure(Throwable failure)
+    public void onCallbackFailure(Id from, Throwable failure)
     {
         tryFailure(failure);
     }
index e227ea0708a4a2fcf96d9a515488845f0e3d224d..29677b0b453f5295777ba06d8950e1f3b6ca0ea7 100644 (file)
@@ -2,9 +2,10 @@ package accord.coordinate;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
 import java.util.function.BiConsumer;
 
+import com.google.common.base.Preconditions;
+
 import accord.api.Key;
 import accord.api.Result;
 import accord.coordinate.Invalidate.Outcome;
@@ -18,10 +19,9 @@ import accord.messages.BeginRecovery.RecoverReply;
 import accord.messages.Callback;
 import accord.messages.Commit;
 import accord.topology.Shard;
-import accord.txn.Ballot;
-import accord.txn.Keys;
-import accord.txn.Txn;
-import accord.txn.TxnId;
+import accord.primitives.Ballot;
+import accord.primitives.Keys;
+import accord.primitives.TxnId;
 import org.apache.cassandra.utils.concurrent.AsyncFuture;
 
 import static accord.coordinate.Propose.Invalidate.proposeInvalidate;
@@ -43,6 +43,7 @@ public class Invalidate extends AsyncFuture<Outcome> implements Callback<Recover
 
     private Invalidate(Node node, Shard shard, Ballot ballot, TxnId txnId, Keys someKeys, Key someKey)
     {
+        Preconditions.checkArgument(someKeys.contains(someKey));
         this.node = node;
         this.ballot = ballot;
         this.txnId = txnId;
@@ -110,21 +111,13 @@ public class Invalidate extends AsyncFuture<Outcome> implements Callback<Recover
                 case PreAccepted:
                     throw new IllegalStateException("Should only have Accepted or later statuses here");
                 case Accepted:
-                    node.withEpoch(acceptOrCommit.executeAt.epoch, () -> {
-                        Recover recover = new Recover(node, ballot, txnId, acceptOrCommit.txn, acceptOrCommit.homeKey);
-                        recover.addCallback(this);
-
-                        Set<Id> nodes = recover.tracker.topologies().copyOfNodes();
-                        for (int i = 0 ; i < invalidateOks.size() ; ++i)
-                        {
-                            if (invalidateOks.get(i).executeAt != null)
-                            {
-                                recover.onSuccess(invalidateOksFrom.get(i), invalidateOks.get(i));
-                                nodes.remove(invalidateOksFrom.get(i));
-                            }
-                        }
-                        recover.start(nodes);
-                    });
+                    // note: we do not propagate our responses to the Recover instance to avoid mistakes;
+                    //       since invalidate contacts only one key, only responses from nodes that replicate
+                    //       *only* that key for the transaction will be valid, as the shards on the replica
+                    //       that own the other keys may not have responded. It would be possible to filter
+                    //       replies now that we have the transaction, but safer to just start from scratch.
+                    Recover.recover(node, ballot, txnId, acceptOrCommit.txn, acceptOrCommit.homeKey)
+                           .addCallback(this);
                     return;
                 case AcceptedInvalidate:
                     break; // latest accept also invalidating, so we're on the same page and should finish our invalidation
@@ -183,7 +176,7 @@ public class Invalidate extends AsyncFuture<Outcome> implements Callback<Recover
     }
 
     @Override
-    public void onCallbackFailure(Throwable failure)
+    public void onCallbackFailure(Id from, Throwable failure)
     {
         tryFailure(failure);
     }
index f2ef119b2bffcfab588b65e2b90d95f36b1e8d5e..9220fea49e807b0c4a7b892a360281aee42b1a5d 100644 (file)
@@ -10,9 +10,9 @@ import accord.messages.CheckStatus.CheckStatusOkFull;
 import accord.messages.CheckStatus.IncludeInfo;
 import accord.messages.Commit;
 import accord.topology.Shard;
-import accord.txn.Ballot;
+import accord.primitives.Ballot;
 import accord.txn.Txn;
-import accord.txn.TxnId;
+import accord.primitives.TxnId;
 import org.apache.cassandra.utils.concurrent.Future;
 
 import static accord.local.Status.Accepted;
@@ -70,7 +70,7 @@ public class MaybeRecover extends CheckShardStatus<CheckStatusOk> implements BiC
     {
         switch (max.status)
         {
-            default: throw new AssertionError();
+            default: throw new IllegalStateException();
             case NotWitnessed:
             case PreAccepted:
             case Accepted:
@@ -91,10 +91,12 @@ public class MaybeRecover extends CheckShardStatus<CheckStatusOk> implements BiC
             case Executed:
             case Applied:
                 CheckStatusOkFull full = (CheckStatusOkFull) max;
-                if (!max.hasExecutedOnAllShards)
-                    Persist.persistAndCommit(node, txnId, someKey, txn, full.executeAt, full.deps, full.writes, full.result);
-                else // TODO: we shouldn't need to do this? Should be handled by progress log once hasExecutedOnAllShards
-                    Commit.commit(node, txnId, txn, full.homeKey, full.executeAt, full.deps);
+                node.withEpoch(full.executeAt.epoch, () -> {
+                    if (!max.hasExecutedOnAllShards)
+                        Persist.persistAndCommit(node, txnId, someKey, txn, full.executeAt, full.deps, full.writes, full.result);
+                    else // TODO: we shouldn't need to do this?
+                        Commit.commit(node, txnId, txn, full.homeKey, full.executeAt, full.deps);
+                });
                 trySuccess(full);
                 break;
 
index 18c790dd2629b512a34483928f03c9ec45598c88..abf4dec127ac4d43294fe6ada0bd6c9f9f34c331 100644 (file)
@@ -15,10 +15,10 @@ import accord.messages.Commit;
 import accord.messages.InformOfPersistence;
 import accord.topology.Shard;
 import accord.topology.Topologies;
-import accord.txn.Dependencies;
-import accord.txn.Timestamp;
+import accord.primitives.Deps;
+import accord.primitives.Timestamp;
 import accord.txn.Txn;
-import accord.txn.TxnId;
+import accord.primitives.TxnId;
 import accord.txn.Writes;
 
 // TODO: do not extend AsyncFuture, just use a simple BiConsumer callback
@@ -32,13 +32,13 @@ public class Persist implements Callback<ApplyOk>
     final Set<Id> persistedOn;
     boolean isDone;
 
-    public static void persist(Node node, Topologies topologies, TxnId txnId, Key homeKey, Txn txn, Timestamp executeAt, Dependencies deps, Writes writes, Result result)
+    public static void persist(Node node, Topologies topologies, TxnId txnId, Key homeKey, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result)
     {
         Persist persist = new Persist(node, topologies, txnId, homeKey, executeAt);
         node.send(topologies.nodes(), to -> new Apply(to, topologies, txnId, txn, homeKey, executeAt, deps, writes, result), persist);
     }
 
-    public static void persistAndCommit(Node node, TxnId txnId, Key homeKey, Txn txn, Timestamp executeAt, Dependencies deps, Writes writes, Result result)
+    public static void persistAndCommit(Node node, TxnId txnId, Key homeKey, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result)
     {
         Topologies persistTo = node.topology().preciseEpochs(txn, executeAt.epoch);
         Persist persist = new Persist(node, persistTo, txnId, homeKey, executeAt);
@@ -80,7 +80,7 @@ public class Persist implements Callback<ApplyOk>
     }
 
     @Override
-    public void onCallbackFailure(Throwable failure)
+    public void onCallbackFailure(Id from, Throwable failure)
     {
     }
 }
index 13fd136e3c9b89b1a5b8b6871d0d6da83192437c..7a7101977ec70eadfc8235d956ac71069c7ea1c0 100644 (file)
@@ -3,7 +3,7 @@ package accord.coordinate;
 import javax.annotation.Nullable;
 
 import accord.api.Key;
-import accord.txn.TxnId;
+import accord.primitives.TxnId;
 
 /**
  * Thrown when a coordinator is preempted by another recovery
index e862a5e38c0f43155b50c2fad312c8a2ec193686..1b97f5f9c78eb7bffd91592580766417a2c4794b 100644 (file)
@@ -10,14 +10,14 @@ import accord.coordinate.tracking.AbstractQuorumTracker.QuorumShardTracker;
 import accord.coordinate.tracking.QuorumTracker;
 import accord.topology.Shard;
 import accord.topology.Topologies;
-import accord.txn.Ballot;
+import accord.primitives.Ballot;
 import accord.messages.Callback;
 import accord.local.Node;
 import accord.local.Node.Id;
-import accord.txn.Timestamp;
-import accord.txn.Dependencies;
+import accord.primitives.Timestamp;
+import accord.primitives.Deps;
 import accord.txn.Txn;
-import accord.txn.TxnId;
+import accord.primitives.TxnId;
 import accord.messages.Accept;
 import accord.messages.Accept.AcceptOk;
 import accord.messages.Accept.AcceptReply;
@@ -51,17 +51,17 @@ class Propose implements Callback<AcceptReply>
     }
 
     public static void propose(Node node, Ballot ballot, TxnId txnId, Txn txn, Key homeKey,
-                               Timestamp executeAt, Dependencies dependencies, BiConsumer<Result, Throwable> callback)
+                               Timestamp executeAt, Deps deps, BiConsumer<Result, Throwable> callback)
     {
         Topologies topologies = node.topology().withUnsyncEpochs(txn, txnId, executeAt);
-        propose(node, topologies, ballot, txnId, txn, homeKey, executeAt, dependencies, callback);
+        propose(node, topologies, ballot, txnId, txn, homeKey, executeAt, deps, callback);
     }
 
     public static void propose(Node node, Topologies topologies, Ballot ballot, TxnId txnId, Txn txn, Key homeKey,
-                               Timestamp executeAt, Dependencies dependencies, BiConsumer<Result, Throwable> callback)
+                               Timestamp executeAt, Deps deps, BiConsumer<Result, Throwable> callback)
     {
         Propose propose = new Propose(node, topologies, ballot, txnId, txn, homeKey, executeAt, callback);
-        node.send(propose.acceptTracker.nodes(), to -> new Accept(to, topologies, ballot, txnId, homeKey, txn, executeAt, dependencies), propose);
+        node.send(propose.acceptTracker.nodes(), to -> new Accept(to, topologies, ballot, txnId, homeKey, txn, executeAt, deps), propose);
     }
 
     @Override
@@ -94,7 +94,7 @@ class Propose implements Callback<AcceptReply>
     }
 
     @Override
-    public void onCallbackFailure(Throwable failure)
+    public void onCallbackFailure(Id from, Throwable failure)
     {
         isDone = true;
         callback.accept(null, failure);
@@ -103,10 +103,7 @@ class Propose implements Callback<AcceptReply>
     private void onAccepted()
     {
         isDone = true;
-        Dependencies deps = new Dependencies();
-        for (AcceptOk acceptOk : acceptOks)
-            deps.addAll(acceptOk.deps);
-
+        Deps deps = Deps.merge(acceptOks, ok -> ok.deps);
         Execute.execute(node, txnId, txn, homeKey, executeAt, deps, callback);
     }
 
@@ -161,7 +158,7 @@ class Propose implements Callback<AcceptReply>
         }
 
         @Override
-        public void onCallbackFailure(Throwable failure)
+        public void onCallbackFailure(Id from, Throwable failure)
         {
             tryFailure(failure);
         }
index eb9b7718c11c77ef52cff4483ed1593a848c1124..a6d0a7d5a6b91b56b0fe763ece5303d7928a1ee7 100644 (file)
@@ -2,7 +2,6 @@ package accord.coordinate;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
@@ -14,14 +13,15 @@ import accord.coordinate.tracking.QuorumTracker;
 import accord.messages.Commit;
 import accord.topology.Shard;
 import accord.topology.Topologies;
-import accord.txn.Ballot;
+import accord.primitives.Ballot;
 import accord.messages.Callback;
 import accord.local.Node;
 import accord.local.Node.Id;
-import accord.txn.Timestamp;
-import accord.txn.Dependencies;
+import accord.primitives.Keys;
+import accord.primitives.Timestamp;
+import accord.primitives.Deps;
 import accord.txn.Txn;
-import accord.txn.TxnId;
+import accord.primitives.TxnId;
 import accord.messages.BeginRecovery;
 import accord.messages.BeginRecovery.RecoverOk;
 import accord.messages.BeginRecovery.RecoverReply;
@@ -45,11 +45,11 @@ public class Recover extends AsyncFuture<Result> implements Callback<RecoverRepl
         //       are given earlier timestamps we can retry without restarting.
         final QuorumTracker tracker;
 
-        AwaitCommit(Node node, TxnId txnId, Txn txn)
+        AwaitCommit(Node node, TxnId txnId, Keys someKeys)
         {
-            Topologies topologies = node.topology().preciseEpochs(txn, txnId.epoch);
+            Topologies topologies = node.topology().preciseEpochs(someKeys, txnId.epoch);
             this.tracker = new QuorumTracker(topologies);
-            node.send(topologies.nodes(), to -> new WaitOnCommit(to, topologies, txnId, txn.keys()), this);
+            node.send(topologies.nodes(), to -> new WaitOnCommit(to, topologies, txnId, someKeys), this);
         }
 
         @Override
@@ -71,19 +71,20 @@ public class Recover extends AsyncFuture<Result> implements Callback<RecoverRepl
         }
 
         @Override
-        public void onCallbackFailure(Throwable failure)
+        public void onCallbackFailure(Id from, Throwable failure)
         {
             tryFailure(failure);
         }
     }
 
-    Future<Object> awaitCommits(Node node, Dependencies waitOn)
+    Future<Object> awaitCommits(Node node, Deps waitOn)
     {
-        AtomicInteger remaining = new AtomicInteger(waitOn.size());
+        AtomicInteger remaining = new AtomicInteger(waitOn.txnIdCount());
         Promise<Object> future = new AsyncPromise<>();
-        for (Map.Entry<TxnId, Txn> e : waitOn)
+        for (int i = 0 ; i < waitOn.txnIdCount() ; ++i)
         {
-            new AwaitCommit(node, e.getKey(), e.getValue()).addCallback((success, failure) -> {
+            TxnId txnId = waitOn.txnId(i);
+            new AwaitCommit(node, txnId, waitOn.someKeys(txnId)).addCallback((success, failure) -> {
                 if (future.isDone())
                     return;
                 if (success != null && remaining.decrementAndGet() == 0)
@@ -165,6 +166,11 @@ public class Recover extends AsyncFuture<Result> implements Callback<RecoverRepl
         return recover(node, ballot, txnId, txn, homeKey, topologies);
     }
 
+    public static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn txn, Key homeKey)
+    {
+        return recover(node, ballot, txnId, txn, homeKey, node.topology().forEpoch(txn, txnId.epoch));
+    }
+
     public static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn txn, Key homeKey, Topologies topologies)
     {
         Recover recover = new Recover(node, ballot, txnId, txn, homeKey, topologies);
@@ -254,16 +260,13 @@ public class Recover extends AsyncFuture<Result> implements Callback<RecoverRepl
         }
 
         // should all be PreAccept
+        Deps deps = Deps.merge(recoverOks, ok -> ok.deps);
+        Deps earlierAcceptedNoWitness = Deps.merge(recoverOks, ok -> ok.earlierAcceptedNoWitness);
+        Deps earlierCommittedWitness = Deps.merge(recoverOks, ok -> ok.earlierCommittedWitness);
         Timestamp maxExecuteAt = txnId;
-        Dependencies deps = new Dependencies();
-        Dependencies earlierAcceptedNoWitness = new Dependencies();
-        Dependencies earlierCommittedWitness = new Dependencies();
         boolean rejectsFastPath = false;
         for (RecoverOk ok : recoverOks)
         {
-            deps.addAll(ok.deps);
-            earlierAcceptedNoWitness.addAll(ok.earlierAcceptedNoWitness);
-            earlierCommittedWitness.addAll(ok.earlierCommittedWitness);
             maxExecuteAt = Timestamp.max(maxExecuteAt, ok.executeAt);
             rejectsFastPath |= ok.rejectsFastPath;
         }
@@ -275,7 +278,7 @@ public class Recover extends AsyncFuture<Result> implements Callback<RecoverRepl
         }
         else
         {
-            earlierAcceptedNoWitness.removeAll(earlierCommittedWitness);
+            earlierAcceptedNoWitness.without(earlierCommittedWitness::contains);
             if (!earlierAcceptedNoWitness.isEmpty())
             {
                 awaitCommits(node, earlierAcceptedNoWitness).addCallback((success, failure) -> {
@@ -311,7 +314,7 @@ public class Recover extends AsyncFuture<Result> implements Callback<RecoverRepl
     }
 
     @Override
-    public void onCallbackFailure(Throwable failure)
+    public void onCallbackFailure(Id from, Throwable failure)
     {
         tryFailure(failure);
     }
index f2be9dc3d61254d9ed8b4111fe1a64bf53ef0161..4e6a93bb9983a5eba283996b2e2f85d967fb4348 100644 (file)
@@ -3,7 +3,7 @@ package accord.coordinate;
 import javax.annotation.Nullable;
 
 import accord.api.Key;
-import accord.txn.TxnId;
+import accord.primitives.TxnId;
 
 /**
  * Thrown when a transaction exceeds its specified timeout for obtaining a result for a client
index 8f8fb61a8c6217404f47e60d341bab6b28355e7b..bcc7181020c0bba57cd20336221acbc587433047 100644 (file)
@@ -116,7 +116,7 @@ public class ReadTracker extends AbstractResponseTracker<ReadTracker.ReadShardTr
                 return accumulate;
 
             if (accumulate == null)
-                accumulate = new HashSet<>();
+                accumulate = new LinkedHashSet<>(); // determinism
 
             accumulate.add(tracker);
             return accumulate;
index 00bf967642a29ca3f0b043dcb53a46080ba2a8b2..c63cce784ee03612534cf15766f40f3f0ac6ecc7 100644 (file)
@@ -16,7 +16,6 @@ import accord.api.Key;
 import accord.api.ProgressLog;
 import accord.api.Result;
 import accord.coordinate.CheckOnCommitted;
-import accord.coordinate.CheckShardStatus;
 import accord.coordinate.Invalidate;
 import accord.impl.SimpleProgressLog.HomeState.LocalStatus;
 import accord.local.Command;
@@ -32,12 +31,12 @@ import accord.messages.Reply;
 import accord.messages.ReplyContext;
 import accord.topology.Shard;
 import accord.topology.Topologies;
-import accord.txn.Ballot;
-import accord.txn.Dependencies;
-import accord.txn.Keys;
-import accord.txn.Timestamp;
+import accord.primitives.Ballot;
+import accord.primitives.Deps;
+import accord.primitives.Keys;
+import accord.primitives.Timestamp;
 import accord.txn.Txn;
-import accord.txn.TxnId;
+import accord.primitives.TxnId;
 import accord.txn.Writes;
 import org.apache.cassandra.utils.concurrent.AsyncFuture;
 import org.apache.cassandra.utils.concurrent.Future;
@@ -173,15 +172,15 @@ public class SimpleProgressLog implements Runnable, ProgressLog.Factory
             }
         }
 
-        private void refreshGlobal(@Nullable Node node, @Nullable Command command, @Nullable Id persistedOn, @Nullable Set<Id> persistedOns)
+        private boolean refreshGlobal(@Nullable Node node, @Nullable Command command, @Nullable Id persistedOn, @Nullable Set<Id> persistedOns)
         {
             if (global == NotExecuted)
-                return;
+                return false;
 
             if (globalPendingDurable != null)
             {
                 if (node == null || command == null || command.is(Status.NotWitnessed))
-                    return;
+                    return false;
 
                 if (persistedOns == null) persistedOns = globalPendingDurable.persistedOn;
                 else persistedOns.addAll(globalPendingDurable.persistedOn);
@@ -194,7 +193,7 @@ public class SimpleProgressLog implements Runnable, ProgressLog.Factory
             {
                 assert node != null && command != null;
                 if (!node.topology().hasEpoch(command.executeAt().epoch))
-                    return;
+                    return false;
 
                 globalNotPersisted = new HashSet<>(node.topology().preciseEpochs(command.txn(), command.executeAt().epoch).nodes());
                 if (local == LocalStatus.Done)
@@ -213,6 +212,8 @@ public class SimpleProgressLog implements Runnable, ProgressLog.Factory
                     globalProgress = Done;
                 }
             }
+
+            return true;
         }
 
         void executedOnAllShards(Node node, Command command, Set<Id> persistedOn)
@@ -329,7 +330,8 @@ public class SimpleProgressLog implements Runnable, ProgressLog.Factory
 
         void updateGlobal(Node node, TxnId txnId, Command command)
         {
-            refreshGlobal(node, command, null, null);
+            if (!refreshGlobal(node, command, null, null))
+                return;
 
             if (global != Disseminating)
                 return;
@@ -375,6 +377,9 @@ public class SimpleProgressLog implements Runnable, ProgressLog.Factory
         void recordBlocking(Command blocking, Keys someKeys)
         {
             this.blocking = blocking;
+            if (blocking.txn() != null && !blocking.txn().keys.containsAll(someKeys))
+                throw new IllegalStateException(String.format("The transaction does not involve some of the keys on which another transaction has taken a dependency upon it (%s vs %s)", blocking.txn().keys, someKeys));
+
             switch (blocking.status())
             {
                 default: throw new IllegalStateException();
@@ -423,8 +428,11 @@ public class SimpleProgressLog implements Runnable, ProgressLog.Factory
             // 2. otherwise record the homeKey for future reference and set the status based on whether progress has been made
             long onEpoch = (command.hasBeen(Status.Committed) ? command.executeAt() : txnId).epoch;
             node.withEpoch(onEpoch, () -> {
-                Keys someKeys = this.someKeys == null ? command.someKeys() : this.someKeys;
-                Key someKey = this.someKeys == null ? command.someKey() : someKeys.get(0);
+                Key someKey; Keys someKeys; {
+                    Keys tmpKeys = Keys.union(this.someKeys, command.someKeys());
+                    someKey = command.homeKey() == null ? tmpKeys.get(0) : command.homeKey();
+                    someKeys = tmpKeys.with(someKey);
+                }
 
                 Shard someShard = node.topology().forEpoch(someKey, onEpoch);
                 CheckOnCommitted check = blockedOn == Executed ? checkOnCommitted(node, txnId, someKey, someShard, onEpoch)
@@ -441,6 +449,13 @@ public class SimpleProgressLog implements Runnable, ProgressLog.Factory
                     switch (success.status)
                     {
                         default: throw new IllegalStateException();
+                        case AcceptedInvalidate:
+                            // we may or may not know the homeShard at this point; if the response doesn't know
+                            // then assume we potentially need to pick up the invalidation
+                            if (success.homeKey != null)
+                                break;
+                            // TODO: probably don't want to immediately go to Invalidate,
+                            //       instead first give in-flight one a chance to complete
                         case NotWitnessed:
                             progress = Investigating;
                             // TODO: this should instead invalidate the transaction on this shard, which invalidates it for all shards,
@@ -463,7 +478,8 @@ public class SimpleProgressLog implements Runnable, ProgressLog.Factory
                             break;
                         case PreAccepted:
                         case Accepted:
-                        case AcceptedInvalidate:
+                            // either it's the home shard and it's managing progress,
+                            // or we now know the home shard and will contact it next time
                             break;
                         case Committed:
                         case ReadyToExecute:
@@ -770,7 +786,7 @@ public class SimpleProgressLog implements Runnable, ProgressLog.Factory
         }
 
         @Override
-        public void onCallbackFailure(Throwable failure)
+        public void onCallbackFailure(Id from, Throwable failure)
         {
             tryFailure(failure);
         }
@@ -779,7 +795,7 @@ public class SimpleProgressLog implements Runnable, ProgressLog.Factory
     public static class ApplyAndCheck extends Apply
     {
         final Set<Id> notPersisted;
-        ApplyAndCheck(Id id, Topologies topologies, TxnId txnId, Txn txn, Key homeKey, Dependencies deps, Timestamp executeAt, Writes writes, Result result, Set<Id> notPersisted)
+        ApplyAndCheck(Id id, Topologies topologies, TxnId txnId, Txn txn, Key homeKey, Deps deps, Timestamp executeAt, Writes writes, Result result, Set<Id> notPersisted)
         {
             super(id, topologies, txnId, txn, homeKey, executeAt, deps, writes, result);
             this.notPersisted = notPersisted;
index 54de17f30505989bcc7b31d335a6e650221991fe..36b3a8d1494e401005bcff0fd1024a7984d1c906 100644 (file)
@@ -9,13 +9,13 @@ import com.google.common.base.Preconditions;
 import accord.api.Key;
 import accord.api.Result;
 import accord.local.Node.Id;
-import accord.topology.KeyRanges;
-import accord.txn.Ballot;
-import accord.txn.Dependencies;
-import accord.txn.Keys;
-import accord.txn.Timestamp;
+import accord.primitives.KeyRanges;
+import accord.primitives.Ballot;
+import accord.primitives.Deps;
+import accord.primitives.Keys;
+import accord.primitives.Timestamp;
 import accord.txn.Txn;
-import accord.txn.TxnId;
+import accord.primitives.TxnId;
 import accord.txn.Writes;
 
 import static accord.local.Status.Accepted;
@@ -37,7 +37,7 @@ public class Command implements Listener, Consumer<Listener>
     private Txn txn; // TODO: only store this on the home shard, or split to each shard independently
     private Ballot promised = Ballot.ZERO, accepted = Ballot.ZERO;
     private Timestamp executeAt; // TODO: compress these states together
-    private Dependencies deps = new Dependencies();
+    private Deps deps = Deps.NONE;
     private Writes writes;
     private Result result;
 
@@ -80,7 +80,7 @@ public class Command implements Listener, Consumer<Listener>
         return executeAt;
     }
 
-    public Dependencies savedDeps()
+    public Deps savedDeps()
     {
         return deps;
     }
@@ -146,7 +146,7 @@ public class Command implements Listener, Consumer<Listener>
             executeAt = txnId.compareTo(max) > 0 && txnId.epoch >= commandStore.latestEpoch()
                         ? txnId : commandStore.uniqueNow(max);
 
-            txn.keys().foldl(commandStore.ranges().since(txnId.epoch), (key, param) -> {
+            txn.keys().foldl(commandStore.ranges().since(txnId.epoch), (i, key, param) -> {
                 if (commandStore.hashIntersects(key))
                     commandStore.commandsForKey(key).register(this);
                 return null;
@@ -170,7 +170,7 @@ public class Command implements Listener, Consumer<Listener>
         return true;
     }
 
-    public boolean accept(Ballot ballot, Txn txn, Key homeKey, Key progressKey, Timestamp executeAt, Dependencies deps)
+    public boolean accept(Ballot ballot, Txn txn, Key homeKey, Key progressKey, Timestamp executeAt, Deps deps)
     {
         if (this.promised.compareTo(ballot) > 0)
             return false;
@@ -207,7 +207,7 @@ public class Command implements Listener, Consumer<Listener>
     }
 
     // relies on mutual exclusion for each key
-    public boolean commit(Txn txn, Key homeKey, Key progressKey, Timestamp executeAt, Dependencies deps)
+    public boolean commit(Txn txn, Key homeKey, Key progressKey, Timestamp executeAt, Deps deps)
     {
         if (hasBeen(Committed))
         {
@@ -224,36 +224,38 @@ public class Command implements Listener, Consumer<Listener>
         this.waitingOnCommit = new TreeMap<>();
         this.waitingOnApply = new TreeMap<>();
 
-        for (TxnId id : savedDeps().on(commandStore, executeAt))
+        KeyRanges ranges = commandStore.ranges().since(executeAt.epoch);
+        if (ranges != null)
         {
-            Command command = commandStore.command(id);
-            switch (command.status)
-            {
-                default:
-                    throw new IllegalStateException();
-                case NotWitnessed:
-                    Txn depTxn = savedDeps().get(id);
-                    command.txn(depTxn);
-                case PreAccepted:
-                case Accepted:
-                case AcceptedInvalidate:
-                    // we don't know when these dependencies will execute, and cannot execute until we do
-                    waitingOnCommit.put(id, command);
-                    command.addListener(this);
-                    break;
-                case Committed:
-                    // TODO: split into ReadyToRead and ReadyToWrite;
-                    //       the distributed read can be performed as soon as those keys are ready, and in parallel with any other reads
-                    //       the client can even ACK immediately after; only the write needs to be postponed until other in-progress reads complete
-                case ReadyToExecute:
-                case Executed:
-                case Applied:
-                    command.addListener(this);
-                    updatePredecessor(command);
-                case Invalidated:
-                    break;
-            }
+            savedDeps().forEachOn(ranges, commandStore::hashIntersects, txnId -> {
+                Command command = commandStore.command(txnId);
+                switch (command.status)
+                {
+                    default:
+                        throw new IllegalStateException();
+                    case NotWitnessed:
+                    case PreAccepted:
+                    case Accepted:
+                    case AcceptedInvalidate:
+                        // we don't know when these dependencies will execute, and cannot execute until we do
+                        command.addListener(this);
+                        waitingOnCommit.put(txnId, command);
+                        break;
+                    case Committed:
+                        // TODO: split into ReadyToRead and ReadyToWrite;
+                        //       the distributed read can be performed as soon as those keys are ready, and in parallel with any other reads
+                        //       the client can even ACK immediately after; only the write needs to be postponed until other in-progress reads complete
+                    case ReadyToExecute:
+                    case Executed:
+                    case Applied:
+                        command.addListener(this);
+                        updatePredecessor(command);
+                    case Invalidated:
+                        break;
+                }
+            });
         }
+
         if (waitingOnCommit.isEmpty())
         {
             waitingOnCommit = null;
@@ -261,8 +263,6 @@ public class Command implements Listener, Consumer<Listener>
                 waitingOnApply = null;
         }
 
-        // TODO: we might not be the homeShard for later phases if we are no longer replicas of the range at executeAt;
-        //       this should be fine, but it might be helpful to provide this info to the progressLog here?
         boolean isProgressShard = progressKey != null && handles(txnId.epoch, progressKey);
         commandStore.progressLog().commit(txnId, isProgressShard, isProgressShard && progressKey.equals(homeKey));
 
@@ -290,7 +290,7 @@ public class Command implements Listener, Consumer<Listener>
         return true;
     }
 
-    public boolean apply(Txn txn, Key homeKey, Key progressKey, Timestamp executeAt, Dependencies deps, Writes writes, Result result)
+    public boolean apply(Txn txn, Key homeKey, Key progressKey, Timestamp executeAt, Deps deps, Writes writes, Result result)
     {
         if (hasBeen(Executed) && executeAt.equals(this.executeAt))
             return false;
@@ -335,10 +335,9 @@ public class Command implements Listener, Consumer<Listener>
         return true;
     }
 
-    public Command addListener(Listener listener)
+    public boolean addListener(Listener listener)
     {
-        listeners.add(listener);
-        return this;
+        return listeners.add(listener);
     }
 
     public void removeListener(Listener listener)
@@ -474,7 +473,7 @@ public class Command implements Listener, Consumer<Listener>
 
         Keys someKeys = cur.someKeys();
         if (someKeys == null)
-            someKeys = prev.deps.get(cur.txnId).keys;
+            someKeys = prev.deps.someKeys(cur.txnId);
         return new BlockedBy(cur.txnId, someKeys);
     }
 
@@ -497,7 +496,7 @@ public class Command implements Listener, Consumer<Listener>
     public void homeKey(Key homeKey)
     {
         if (this.homeKey == null) this.homeKey = homeKey;
-        else if (!this.homeKey.equals(homeKey)) throw new AssertionError();
+        else if (!this.homeKey.equals(homeKey)) throw new IllegalStateException();
     }
 
     public Keys someKeys()
@@ -508,17 +507,6 @@ public class Command implements Listener, Consumer<Listener>
         return null;
     }
 
-    public Key someKey()
-    {
-        if (homeKey != null)
-            return homeKey;
-
-        if (txn.keys != null)
-            return txn.keys.get(0);
-
-        return null;
-    }
-
     /**
      * A key nominated to be the primary shard within this node for managing progress of the command.
      * It is nominated only as of txnId.epoch, and may be null (indicating that this node does not monitor
@@ -534,7 +522,7 @@ public class Command implements Listener, Consumer<Listener>
     public void progressKey(Key progressKey)
     {
         if (this.progressKey == null) this.progressKey = progressKey;
-        else if (!this.progressKey.equals(progressKey)) throw new AssertionError();
+        else if (!this.progressKey.equals(progressKey)) throw new IllegalStateException();
     }
 
     // does this specific Command instance execute (i.e. does it lose ownership post Commit)
@@ -547,7 +535,7 @@ public class Command implements Listener, Consumer<Listener>
     public void txn(Txn txn)
     {
         if (this.txn == null) this.txn = txn;
-        else if (!this.txn.equals(txn)) throw new AssertionError();
+        else if (!this.txn.equals(txn)) throw new IllegalStateException();
     }
 
     public boolean handles(long epoch, Key someKey)
index 32e3ec9809ac2e7d3502041a48f9edabd314e6b3..e69369163ac360e2068adc4094b6cb44682f0f6c 100644 (file)
@@ -4,12 +4,12 @@ import accord.api.Agent;
 import accord.api.Key;
 import accord.local.CommandStores.ShardedRanges;
 import accord.api.ProgressLog;
-import accord.topology.KeyRange;
+import accord.primitives.KeyRange;
 import accord.api.DataStore;
-import accord.topology.KeyRanges;
-import accord.txn.Keys;
-import accord.txn.Timestamp;
-import accord.txn.TxnId;
+import accord.primitives.KeyRanges;
+import accord.primitives.Keys;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.Promise;
index 2f5abc5f49589fc02c26d214a2dd2f2ab979de0f..b2fd8b62bd8b01c457b1ff05a2e60060571a338f 100644 (file)
@@ -4,11 +4,10 @@ import accord.api.Agent;
 import accord.api.Key;
 import accord.api.DataStore;
 import accord.local.CommandStore.RangesForEpoch;
-import accord.messages.TxnRequest;
 import accord.api.ProgressLog;
-import accord.topology.KeyRanges;
+import accord.primitives.KeyRanges;
 import accord.topology.Topology;
-import accord.txn.Keys;
+import accord.primitives.Keys;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
@@ -196,7 +195,7 @@ public abstract class CommandStores
                 int index = ranges[i].rangeIndexForKey(scope);
                 if (index < 0)
                     continue;
-                result = addKeyIndex(scope, shards.length, result);
+                result = addKeyIndex(0, scope, shards.length, result);
             }
             return result;
         }
@@ -208,10 +207,10 @@ public abstract class CommandStores
 
         static long keyIndex(Key key, long numShards)
         {
-            return Integer.toUnsignedLong(key.keyHash()) % numShards;
+            return Integer.toUnsignedLong(key.routingHash()) % numShards;
         }
 
-        private static long addKeyIndex(Key key, long numShards, long accumulate)
+        private static long addKeyIndex(int i, Key key, long numShards, long accumulate)
         {
             return accumulate | (1L << keyIndex(key, numShards));
         }
index 465f387b4f5c456c12a5f2a7a7ab898d4e429ef7..b15f8bd06f9bc605da9f839ea58d0887318dff81 100644 (file)
@@ -5,7 +5,7 @@ import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.function.Consumer;
 
-import accord.txn.Timestamp;
+import accord.primitives.Timestamp;
 import com.google.common.collect.Iterators;
 
 public class CommandsForKey implements Listener, Iterable<Command>
index cb3444a98d1a1743452203d0ef0b49d9c50ced31..2cec60362a43f0cd48a57a44781262e058c719c8 100644 (file)
@@ -31,16 +31,16 @@ import accord.messages.CheckStatus.CheckStatusOk;
 import accord.messages.ReplyContext;
 import accord.messages.Request;
 import accord.messages.Reply;
-import accord.topology.KeyRange;
-import accord.topology.KeyRanges;
+import accord.primitives.KeyRange;
+import accord.primitives.KeyRanges;
 import accord.topology.Shard;
 import accord.topology.Topology;
 import accord.topology.TopologyManager;
-import accord.txn.Ballot;
-import accord.txn.Keys;
-import accord.txn.Timestamp;
+import accord.primitives.Ballot;
+import accord.primitives.Keys;
+import accord.primitives.Timestamp;
 import accord.txn.Txn;
-import accord.txn.TxnId;
+import accord.primitives.TxnId;
 import org.apache.cassandra.utils.concurrent.Future;
 
 public class Node implements ConfigurationService.Listener
@@ -411,7 +411,7 @@ public class Node implements ConfigurationService.Listener
     @VisibleForTesting
     public @Nullable Key trySelectHomeKey(TxnId txnId, Keys keys)
     {
-        int i = topology().localForEpoch(txnId.epoch).ranges().findFirstIntersecting(keys);
+        int i = topology().localForEpoch(txnId.epoch).ranges().findFirstKey(keys);
         return i >= 0 ? keys.get(i) : null;
     }
 
@@ -440,7 +440,7 @@ public class Node implements ConfigurationService.Listener
         if (topology.ranges().contains(homeKey))
             return homeKey;
 
-        int i = topology.ranges().findFirstIntersecting(keys);
+        int i = topology.ranges().findFirstKey(keys);
         if (i < 0)
             return null;
         return keys.get(i);
index b7b904df2e676377175a29b4baab6aea10c510f3..cc5b7e47cb1b1b29db056711cdc01850a8b704f5 100644 (file)
@@ -4,13 +4,13 @@ import accord.messages.TxnRequest.WithUnsynced;
 import accord.local.Node.Id;
 import accord.topology.Topologies;
 import accord.api.Key;
-import accord.txn.Ballot;
+import accord.primitives.Ballot;
 import accord.local.Node;
-import accord.txn.Timestamp;
+import accord.primitives.Timestamp;
 import accord.local.Command;
-import accord.txn.Dependencies;
+import accord.primitives.Deps;
 import accord.txn.Txn;
-import accord.txn.TxnId;
+import accord.primitives.TxnId;
 
 import static accord.messages.PreAccept.calculateDeps;
 
@@ -20,9 +20,9 @@ public class Accept extends WithUnsynced
     public final Key homeKey;
     public final Txn txn;
     public final Timestamp executeAt;
-    public final Dependencies deps;
+    public final Deps deps;
 
-    public Accept(Id to, Topologies topologies, Ballot ballot, TxnId txnId, Key homeKey, Txn txn, Timestamp executeAt, Dependencies deps)
+    public Accept(Id to, Topologies topologies, Ballot ballot, TxnId txnId, Key homeKey, Txn txn, Timestamp executeAt, Deps deps)
     {
         super(to, topologies, txn.keys, txnId);
         this.ballot = ballot;
@@ -50,8 +50,7 @@ public class Accept extends WithUnsynced
             AcceptOk ok2 = (AcceptOk) r2;
             if (ok1.deps.isEmpty()) return ok2;
             if (ok2.deps.isEmpty()) return ok1;
-            ok1.deps.addAll(ok2.deps);
-            return ok1;
+            return new AcceptOk(txnId, ok1.deps.with(ok2.deps));
         }));
     }
 
@@ -93,7 +92,7 @@ public class Accept extends WithUnsynced
         @Override
         public String toString()
         {
-            return "AcceptInvalidate{" + ballot + '}';
+            return "AcceptInvalidate{ballot:" + ballot + ", txnId:" + txnId + ", key:" + someKey + '}';
         }
 
         @Override
@@ -117,9 +116,9 @@ public class Accept extends WithUnsynced
     public static class AcceptOk implements AcceptReply
     {
         public final TxnId txnId;
-        public final Dependencies deps;
+        public final Deps deps;
 
-        public AcceptOk(TxnId txnId, Dependencies deps)
+        public AcceptOk(TxnId txnId, Deps deps)
         {
             this.txnId = txnId;
             this.deps = deps;
index 5310904d897ea1f4ebdb332dfe27b7081a767149..dc2145cc5f01acfe9b5123d459befd347f273014 100644 (file)
@@ -5,11 +5,11 @@ import accord.local.Node;
 import accord.local.Node.Id;
 import accord.api.Result;
 import accord.topology.Topologies;
-import accord.txn.Dependencies;
-import accord.txn.Timestamp;
+import accord.primitives.Deps;
+import accord.primitives.Timestamp;
 import accord.txn.Writes;
 import accord.txn.Txn;
-import accord.txn.TxnId;
+import accord.primitives.TxnId;
 
 import static accord.messages.MessageType.APPLY_REQ;
 import static accord.messages.MessageType.APPLY_RSP;
@@ -20,11 +20,11 @@ public class Apply extends TxnRequest
     public final Txn txn;
     protected final Key homeKey;
     public final Timestamp executeAt;
-    public final Dependencies deps;
+    public final Deps deps;
     public final Writes writes;
     public final Result result;
 
-    public Apply(Node.Id to, Topologies topologies, TxnId txnId, Txn txn, Key homeKey, Timestamp executeAt, Dependencies deps, Writes writes, Result result)
+    public Apply(Node.Id to, Topologies topologies, TxnId txnId, Txn txn, Key homeKey, Timestamp executeAt, Deps deps, Writes writes, Result result)
     {
         super(to, topologies, txn.keys);
         this.txnId = txnId;
index c2d91110761cc7f1f6ac83881e03a9dcbb2bc03a..407fe49162e5359bdbd0c425c1372a9e2f5acf5a 100644 (file)
@@ -9,11 +9,11 @@ import accord.local.Status;
 import accord.messages.BeginRecovery.RecoverNack;
 import accord.messages.BeginRecovery.RecoverOk;
 import accord.messages.BeginRecovery.RecoverReply;
-import accord.txn.Ballot;
-import accord.txn.Dependencies;
-import accord.txn.Timestamp;
+import accord.primitives.Ballot;
+import accord.primitives.Deps;
+import accord.primitives.Timestamp;
 import accord.txn.Txn;
-import accord.txn.TxnId;
+import accord.primitives.TxnId;
 import accord.txn.Writes;
 
 public class BeginInvalidate implements EpochRequest
@@ -70,7 +70,7 @@ public class BeginInvalidate implements EpochRequest
         public final Txn txn;
         public final Key homeKey;
 
-        public InvalidateOk(TxnId txnId, Status status, Ballot accepted, Timestamp executeAt, Dependencies deps, Writes writes, Result result, Txn txn, Key homeKey)
+        public InvalidateOk(TxnId txnId, Status status, Ballot accepted, Timestamp executeAt, Deps deps, Writes writes, Result result, Txn txn, Key homeKey)
         {
             super(txnId, status, accepted, executeAt, deps, null, null, false, writes, result);
             this.txn = txn;
index 0abb99123dc1de9a626f9472592069ad7f206b2c..ad883e29d464acdb3322c59b6ce4a7b303b75b18 100644 (file)
@@ -10,17 +10,17 @@ import java.util.stream.Stream;
 import accord.api.Key;
 import accord.local.CommandStore;
 import accord.local.CommandsForKey;
-import accord.txn.Keys;
+import accord.primitives.Keys;
 import accord.txn.Writes;
-import accord.txn.Ballot;
+import accord.primitives.Ballot;
 import accord.local.Node;
 import accord.local.Node.Id;
-import accord.txn.Timestamp;
+import accord.primitives.Timestamp;
 import accord.local.Command;
-import accord.txn.Dependencies;
+import accord.primitives.Deps;
 import accord.local.Status;
 import accord.txn.Txn;
-import accord.txn.TxnId;
+import accord.primitives.TxnId;
 import com.google.common.base.Preconditions;
 
 import static accord.local.Status.Accepted;
@@ -55,16 +55,16 @@ public class BeginRecovery extends TxnRequest
             if (!command.recover(txn, homeKey, progressKey, ballot))
                 return new RecoverNack(command.promised());
 
-            Dependencies deps = command.status() == PreAccepted ? calculateDeps(instance, txnId, txn, txnId)
-                                                                : command.savedDeps();
+            Deps deps = command.status() == PreAccepted ? calculateDeps(instance, txnId, txn, txnId)
+                                                        : command.savedDeps();
 
             boolean rejectsFastPath;
-            Dependencies earlierCommittedWitness, earlierAcceptedNoWitness;
+            Deps earlierCommittedWitness, earlierAcceptedNoWitness;
 
             if (command.hasBeen(Committed))
             {
                 rejectsFastPath = false;
-                earlierCommittedWitness = earlierAcceptedNoWitness = new Dependencies();
+                earlierCommittedWitness = earlierAcceptedNoWitness = Deps.NONE;
             }
             else
             {
@@ -75,20 +75,15 @@ public class BeginRecovery extends TxnRequest
                     rejectsFastPath = committedExecutesAfter(instance, txnId, txn.keys)
                                          .anyMatch(c -> !c.savedDeps().contains(txnId));
 
+                // TODO: introduce some good unit tests for verifying these two functions in a real repair scenario
                 // committed txns with an earlier txnid and have our txnid as a dependency
-                earlierCommittedWitness = committedStartedBefore(instance, txnId, txn.keys)
-                                          .filter(c -> c.savedDeps().contains(txnId))
-                                          .collect(Dependencies::new, Dependencies::add, Dependencies::addAll);
+                earlierCommittedWitness = committedStartedBeforeAndDidWitness(instance, txnId, txn.keys);
 
                 // accepted txns with an earlier txnid that don't have our txnid as a dependency
-                earlierAcceptedNoWitness = uncommittedStartedBefore(instance, txnId, txn.keys)
-                                              .filter(c -> c.is(Accepted)
-                                                           && !c.savedDeps().contains(txnId)
-                                                           && c.executeAt().compareTo(txnId) > 0)
-                                              .collect(Dependencies::new, Dependencies::add, Dependencies::addAll);
+                earlierAcceptedNoWitness = acceptedStartedBeforeAndDidNotWitness(instance, txnId, txn.keys);
             }
             return new RecoverOk(txnId, command.status(), command.accepted(), command.executeAt(), deps, earlierCommittedWitness, earlierAcceptedNoWitness, rejectsFastPath, command.writes(), command.result());
-        }, (r1, r2) -> {
+        }, (r1, r2) -> { // TODO: reduce function that constructs a list to reduce at the end
             if (!r1.isOK()) return r1;
             if (!r2.isOK()) return r2;
             RecoverOk ok1 = (RecoverOk) r1;
@@ -123,27 +118,17 @@ public class BeginRecovery extends TxnRequest
             }
 
             // ok1 and ok2 both PreAccepted
-            Dependencies deps;
-            if (ok1.deps.equals(ok2.deps))
-            {
-                deps = ok1.deps;
-            }
-            else
-            {
-                deps = new Dependencies();
-                deps.addAll(ok1.deps);
-                deps.addAll(ok2.deps);
-            }
-            ok1.earlierCommittedWitness.addAll(ok2.earlierCommittedWitness);
-            ok1.earlierAcceptedNoWitness.addAll(ok2.earlierAcceptedNoWitness);
-            ok1.earlierAcceptedNoWitness.removeAll(ok1.earlierCommittedWitness);
+            Deps deps = ok1.deps.with(ok2.deps);
+            Deps earlierCommittedWitness = ok1.earlierCommittedWitness.with(ok2.earlierCommittedWitness);
+            Deps earlierAcceptedNoWitness = ok1.earlierAcceptedNoWitness.with(ok2.earlierAcceptedNoWitness)
+                                                                        .without(earlierCommittedWitness::contains);
             return new RecoverOk(
                     txnId, ok1.status,
                     Ballot.max(ok1.accepted, ok2.accepted),
                     Timestamp.max(ok1.executeAt, ok2.executeAt),
                     deps,
-                    ok1.earlierCommittedWitness,
-                    ok1.earlierAcceptedNoWitness,
+                    earlierCommittedWitness,
+                    earlierAcceptedNoWitness,
                     ok1.rejectsFastPath | ok2.rejectsFastPath,
                     ok1.writes, ok1.result);
         });
@@ -194,14 +179,14 @@ public class BeginRecovery extends TxnRequest
         public final Status status;
         public final Ballot accepted;
         public final Timestamp executeAt;
-        public final Dependencies deps;
-        public final Dependencies earlierCommittedWitness;  // counter-point to earlierAcceptedNoWitness
-        public final Dependencies earlierAcceptedNoWitness; // wait for these to commit
+        public final Deps deps;
+        public final Deps earlierCommittedWitness;  // counter-point to earlierAcceptedNoWitness
+        public final Deps earlierAcceptedNoWitness; // wait for these to commit
         public final boolean rejectsFastPath;
         public final Writes writes;
         public final Result result;
 
-        public RecoverOk(TxnId txnId, Status status, Ballot accepted, Timestamp executeAt, Dependencies deps, Dependencies earlierCommittedWitness, Dependencies earlierAcceptedNoWitness, boolean rejectsFastPath, Writes writes, Result result)
+        public RecoverOk(TxnId txnId, Status status, Ballot accepted, Timestamp executeAt, Deps deps, Deps earlierCommittedWitness, Deps earlierAcceptedNoWitness, boolean rejectsFastPath, Writes writes, Result result)
         {
             this.txnId = txnId;
             this.accepted = accepted;
@@ -289,24 +274,44 @@ public class BeginRecovery extends TxnRequest
         }
     }
 
-    private static Stream<Command> uncommittedStartedBefore(CommandStore commandStore, TxnId startedBefore, Keys keys)
+    private static Deps acceptedStartedBeforeAndDidNotWitness(CommandStore commandStore, TxnId txnId, Keys keys)
     {
-        return keys.stream().flatMap(key -> {
-            CommandsForKey forKey = commandStore.maybeCommandsForKey(key);
-            if (forKey == null)
-                return Stream.of();
-            return forKey.uncommitted.headMap(startedBefore, false).values().stream();
-        });
+        try (Deps.OrderedBuilder builder = Deps.orderedBuilder(true);)
+        {
+            keys.forEach(key -> {
+                CommandsForKey forKey = commandStore.maybeCommandsForKey(key);
+                if (forKey == null)
+                    return;
+
+                // committed txns with an earlier txnid and have our txnid as a dependency
+                builder.nextKey(key);
+                forKey.uncommitted.headMap(txnId, false).forEach((ts, command) -> {
+                    if (command.is(Accepted) && !command.savedDeps().contains(txnId) && command.executeAt().compareTo(txnId) > 0)
+                        builder.add(command.txnId());
+                });
+            });
+            return builder.build();
+        }
     }
 
-    private static Stream<Command> committedStartedBefore(CommandStore commandStore, TxnId startedBefore, Keys keys)
+    private static Deps committedStartedBeforeAndDidWitness(CommandStore commandStore, TxnId txnId, Keys keys)
     {
-        return keys.stream().flatMap(key -> {
-            CommandsForKey forKey = commandStore.maybeCommandsForKey(key);
-            if (forKey == null)
-                return Stream.of();
-            return forKey.committedById.headMap(startedBefore, false).values().stream();
-        });
+        try (Deps.OrderedBuilder builder = Deps.orderedBuilder(true);)
+        {
+            keys.forEach(key -> {
+                CommandsForKey forKey = commandStore.maybeCommandsForKey(key);
+                if (forKey == null)
+                    return;
+
+                // committed txns with an earlier txnid and have our txnid as a dependency
+                builder.nextKey(key);
+                forKey.committedById.headMap(txnId, false).forEach((ts, command) -> {
+                    if (command.savedDeps().contains(txnId))
+                        builder.add(command.txnId());
+                });
+            });
+            return builder.build();
+        }
     }
 
     private static Stream<Command> uncommittedStartedAfter(CommandStore commandStore, TxnId startedAfter, Keys keys)
index 245d854c49f3f3c2d5c2c283e77ca2b9b2585f2e..ca99dde22d8348767da905e98135e982d3463e3d 100644 (file)
@@ -11,5 +11,5 @@ public interface Callback<T>
     void onSuccess(Id from, T response);
     default void onSlowResponse(Id from) {}
     void onFailure(Id from, Throwable failure);
-    void onCallbackFailure(Throwable failure);
+    void onCallbackFailure(Id from, Throwable failure);
 }
index 5eda642959baaa3e8868c88d9912801099a05181..d66dc510847dce845ced9bce8ff547412f599a3a 100644 (file)
@@ -6,11 +6,11 @@ import accord.local.Command;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.local.Status;
-import accord.txn.Ballot;
-import accord.txn.Dependencies;
-import accord.txn.Timestamp;
+import accord.primitives.Ballot;
+import accord.primitives.Deps;
+import accord.primitives.Timestamp;
 import accord.txn.Txn;
-import accord.txn.TxnId;
+import accord.primitives.TxnId;
 import accord.txn.Writes;
 
 public class CheckStatus implements Request
@@ -165,12 +165,12 @@ public class CheckStatus implements Request
         public final Txn txn;
         public final Key homeKey;
         public final Timestamp executeAt;
-        public final Dependencies deps;
+        public final Deps deps;
         public final Writes writes;
         public final Result result;
 
         CheckStatusOkFull(Status status, Ballot promised, Ballot accepted, boolean isCoordinating, boolean hasExecutedOnAllShards,
-                          Txn txn, Key homeKey, Timestamp executeAt, Dependencies deps, Writes writes, Result result)
+                          Txn txn, Key homeKey, Timestamp executeAt, Deps deps, Writes writes, Result result)
         {
             super(status, promised, accepted, isCoordinating, hasExecutedOnAllShards);
             this.txn = txn;
index f69bd0ed4f4ddb2dfad057a45b6ed58e190e81ff..a4d67ba046bd53218ae8aaee49a49ae1d8c188c0 100644 (file)
@@ -3,25 +3,23 @@ package accord.messages;
 import java.util.Collections;
 import java.util.Set;
 
-import com.google.common.base.Preconditions;
-
 import accord.api.Key;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.topology.Topologies;
-import accord.txn.Keys;
-import accord.txn.Timestamp;
-import accord.txn.Dependencies;
+import accord.primitives.Keys;
+import accord.primitives.Timestamp;
+import accord.primitives.Deps;
 import accord.txn.Txn;
-import accord.txn.TxnId;
+import accord.primitives.TxnId;
 
 // TODO: CommitOk responses, so we can send again if no reply received? Or leave to recovery?
 public class Commit extends ReadData
 {
-    public final Dependencies deps;
+    public final Deps deps;
     public final boolean read;
 
-    public Commit(Id to, Topologies topologies, TxnId txnId, Txn txn, Key homeKey, Timestamp executeAt, Dependencies deps, boolean read)
+    public Commit(Id to, Topologies topologies, TxnId txnId, Txn txn, Key homeKey, Timestamp executeAt, Deps deps, boolean read)
     {
         super(to, topologies, txnId, txn, homeKey, executeAt);
         this.deps = deps;
@@ -29,7 +27,7 @@ public class Commit extends ReadData
     }
 
     // TODO: accept Topology not Topologies
-    public static void commitAndRead(Node node, Topologies executeTopologies, TxnId txnId, Txn txn, Key homeKey, Timestamp executeAt, Dependencies deps, Set<Id> readSet, Callback<ReadReply> callback)
+    public static void commitAndRead(Node node, Topologies executeTopologies, TxnId txnId, Txn txn, Key homeKey, Timestamp executeAt, Deps deps, Set<Id> readSet, Callback<ReadReply> callback)
     {
         for (Node.Id to : executeTopologies.nodes())
         {
@@ -45,7 +43,7 @@ public class Commit extends ReadData
         }
     }
 
-    public static void commit(Node node, TxnId txnId, Txn txn, Key homeKey, Timestamp executeAt, Dependencies deps)
+    public static void commit(Node node, TxnId txnId, Txn txn, Key homeKey, Timestamp executeAt, Deps deps)
     {
         Topologies commitTo = node.topology().preciseEpochs(txn, txnId.epoch, executeAt.epoch);
         for (Node.Id to : commitTo.nodes())
@@ -55,7 +53,7 @@ public class Commit extends ReadData
         }
     }
 
-    public static void commit(Node node, Topologies commitTo, Set<Id> doNotCommitTo, TxnId txnId, Txn txn, Key homeKey, Timestamp executeAt, Dependencies deps)
+    public static void commit(Node node, Topologies commitTo, Set<Id> doNotCommitTo, TxnId txnId, Txn txn, Key homeKey, Timestamp executeAt, Deps deps)
     {
         for (Node.Id to : commitTo.nodes())
         {
@@ -67,7 +65,7 @@ public class Commit extends ReadData
         }
     }
 
-    public static void commit(Node node, Topologies commitTo, Topologies appliedTo, TxnId txnId, Txn txn, Key homeKey, Timestamp executeAt, Dependencies deps)
+    public static void commit(Node node, Topologies commitTo, Topologies appliedTo, TxnId txnId, Txn txn, Key homeKey, Timestamp executeAt, Deps deps)
     {
         // TODO: if we switch to Topology rather than Topologies we can avoid sending commits to nodes that Apply the same
         commit(node, commitTo, Collections.emptySet(), txnId, txn, homeKey, executeAt, deps);
index 1ea3ffdc546312197f0466b439553a67b52095ab..f86eaea6d297f40e9996aaf03d747f973b8c1ae3 100644 (file)
@@ -5,8 +5,8 @@ import java.util.Set;
 import accord.api.Key;
 import accord.local.Node;
 import accord.local.Node.Id;
-import accord.txn.Timestamp;
-import accord.txn.TxnId;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
 
 import static accord.messages.InformOfTxn.InformOfTxnNack.nack;
 import static accord.messages.InformOfTxn.InformOfTxnOk.ok;
index 3c7bbf9ad74ec3c2b494f140bd23aa79c8e24785..d04579ae5e369dc2301ac3fd9293afaa759d4797 100644 (file)
@@ -4,7 +4,7 @@ import accord.api.Key;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.txn.Txn;
-import accord.txn.TxnId;
+import accord.primitives.TxnId;
 
 import static accord.messages.InformOfTxn.InformOfTxnNack.nack;
 import static accord.messages.InformOfTxn.InformOfTxnOk.ok;
index c1d704d0a7ca40aab46bfd5cadc0c9e6223b8a12..769d7172665ff744ce66ab07e7eee59e9af0cd13 100644 (file)
@@ -1,7 +1,7 @@
 package accord.messages;
 
-import java.util.Objects;
-import java.util.stream.Stream;
+import java.util.*;
+import java.util.function.BiConsumer;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -12,12 +12,12 @@ import accord.local.Node;
 import accord.local.Node.Id;
 import accord.messages.TxnRequest.WithUnsynced;
 import accord.topology.Topologies;
-import accord.txn.Keys;
-import accord.txn.Timestamp;
+import accord.primitives.Keys;
+import accord.primitives.Timestamp;
 import accord.local.Command;
-import accord.txn.Dependencies;
+import accord.primitives.Deps;
 import accord.txn.Txn;
-import accord.txn.TxnId;
+import accord.primitives.TxnId;
 
 public class PreAccept extends WithUnsynced
 {
@@ -60,9 +60,10 @@ public class PreAccept extends WithUnsynced
             PreAcceptOk ok1 = (PreAcceptOk) r1;
             PreAcceptOk ok2 = (PreAcceptOk) r2;
             PreAcceptOk okMax = ok1.witnessedAt.compareTo(ok2.witnessedAt) >= 0 ? ok1 : ok2;
-            if (ok1 != okMax && !ok1.deps.isEmpty()) okMax.deps.addAll(ok1.deps);
-            if (ok2 != okMax && !ok2.deps.isEmpty()) okMax.deps.addAll(ok2.deps);
-            return okMax;
+            Deps deps = ok1.deps.with(ok2.deps);
+            if (deps == okMax.deps)
+                return okMax;
+            return new PreAcceptOk(txnId, okMax.witnessedAt, deps);
         }));
     }
 
@@ -87,9 +88,9 @@ public class PreAccept extends WithUnsynced
     {
         public final TxnId txnId;
         public final Timestamp witnessedAt;
-        public final Dependencies deps;
+        public final Deps deps;
 
-        public PreAcceptOk(TxnId txnId, Timestamp witnessedAt, Dependencies deps)
+        public PreAcceptOk(TxnId txnId, Timestamp witnessedAt, Deps deps)
         {
             this.txnId = txnId;
             this.witnessedAt = witnessedAt;
@@ -147,17 +148,22 @@ public class PreAccept extends WithUnsynced
         }
     }
 
-    static Dependencies calculateDeps(CommandStore commandStore, TxnId txnId, Txn txn, Timestamp executeAt)
+    static Deps calculateDeps(CommandStore commandStore, TxnId txnId, Txn txn, Timestamp executeAt)
     {
-        Dependencies deps = new Dependencies();
-        conflictsMayExecuteBefore(commandStore, executeAt, txn.keys).forEach(conflict -> {
-            if (conflict.txnId().equals(txnId))
-                return;
-
-            if (txn.isWrite() || conflict.txn().isWrite())
-                deps.add(conflict);
-        });
-        return deps;
+        try (Deps.OrderedBuilder builder = Deps.orderedBuilder(false);)
+        {
+            txn.keys.forEach(key -> {
+                CommandsForKey forKey = commandStore.maybeCommandsForKey(key);
+                if (forKey == null)
+                    return;
+
+                builder.nextKey(key);
+                forKey.uncommitted.headMap(executeAt, false).forEach(conflicts(txnId, txn.isWrite(), builder));
+                forKey.committedByExecuteAt.headMap(executeAt, false).forEach(conflicts(txnId, txn.isWrite(), builder));
+            });
+
+            return builder.build();
+        }
     }
 
     @Override
@@ -170,19 +176,11 @@ public class PreAccept extends WithUnsynced
                '}';
     }
 
-    private static Stream<Command> conflictsMayExecuteBefore(CommandStore commandStore, Timestamp mayExecuteBefore, Keys keys)
+    private static BiConsumer<Timestamp, Command> conflicts(TxnId txnId, boolean isWrite, Deps.OrderedBuilder builder)
     {
-        return keys.stream().flatMap(key -> {
-            CommandsForKey forKey = commandStore.maybeCommandsForKey(key);
-            if (forKey == null)
-                return Stream.of();
-
-            return Stream.concat(
-                forKey.uncommitted.headMap(mayExecuteBefore, false).values().stream(),
-                // TODO: only return latest of Committed?
-                forKey.committedByExecuteAt.headMap(mayExecuteBefore, false).values().stream()
-            );
-        });
+        return (ts, command) -> {
+            if (!txnId.equals(command.txnId()) && (isWrite || command.txn().isWrite()))
+                builder.add(command.txnId());
+        };
     }
-
 }
index 76df7aab4dd1fd5d6352860346da302ef5ecf4aa..3986417b6596351076dac8fa0038f3fbb4a55680 100644 (file)
@@ -9,10 +9,10 @@ import accord.local.*;
 import accord.local.Node.Id;
 import accord.api.Data;
 import accord.topology.Topologies;
-import accord.txn.Keys;
-import accord.txn.Timestamp;
+import accord.primitives.Keys;
+import accord.primitives.Timestamp;
 import accord.txn.Txn;
-import accord.txn.TxnId;
+import accord.primitives.TxnId;
 import accord.utils.DeterministicIdentitySet;
 
 public class ReadData extends TxnRequest
@@ -166,6 +166,12 @@ public class ReadData extends TxnRequest
         {
             return false;
         }
+
+        @Override
+        public String toString()
+        {
+            return "ReadNack";
+        }
     }
 
     public static class ReadOk extends ReadReply
index c18bd9f664c436cd189b76c422ab5bfb9bae2c1b..1a84cac3e10ebc9ea9b33219cc58032ac4d037e3 100644 (file)
@@ -6,11 +6,11 @@ import com.google.common.base.Preconditions;
 import accord.api.Key;
 import accord.local.Node;
 import accord.local.Node.Id;
-import accord.topology.KeyRanges;
+import accord.primitives.KeyRanges;
 import accord.topology.Topologies;
 import accord.topology.Topology;
-import accord.txn.Keys;
-import accord.txn.TxnId;
+import accord.primitives.Keys;
+import accord.primitives.TxnId;
 
 import static java.lang.Long.min;
 
@@ -186,7 +186,7 @@ public abstract class TxnRequest implements EpochRequest
             Topology topology = topologies.get(i);
             KeyRanges ranges = topology.rangesForNode(node);
             if (ranges != last && ranges != null && !ranges.equals(last))
-                scopeKeys = scopeKeys.union(keys.intersect(ranges));
+                scopeKeys = scopeKeys.union(keys.slice(ranges));
 
             last = ranges;
         }
index ade17d7c7e5fd4434ab99a2396b12cae0f8e04e1..ecf16b521d81a3350dcfd4cea6867fde51af84e0 100644 (file)
@@ -7,8 +7,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import accord.local.*;
 import accord.local.Node.Id;
 import accord.topology.Topologies;
-import accord.txn.TxnId;
-import accord.txn.Keys;
+import accord.primitives.TxnId;
+import accord.primitives.Keys;
 
 public class WaitOnCommit extends TxnRequest
 {
@@ -58,7 +58,7 @@ public class WaitOnCommit extends TxnRequest
                 node.reply(replyToNode, replyContext, WaitOnCommitOk.INSTANCE);
         }
 
-        void setup(CommandStore instance)
+        void setup(Keys keys, CommandStore instance)
         {
             Command command = instance.command(txnId);
             switch (command.status())
@@ -68,6 +68,7 @@ public class WaitOnCommit extends TxnRequest
                 case Accepted:
                 case AcceptedInvalidate:
                     command.addListener(this);
+                    instance.progressLog().waiting(txnId, keys);
                     break;
 
                 case Committed:
@@ -83,7 +84,7 @@ public class WaitOnCommit extends TxnRequest
         {
             List<CommandStore> instances = node.collectLocal(keys, txnId, ArrayList::new);
             waitingOn.set(instances.size());
-            instances.forEach(instance -> instance.processBlocking(this::setup));
+            instances.forEach(instance -> instance.processBlocking(ignore -> setup(keys, instance)));
         }
     }
 
similarity index 92%
rename from accord-core/src/main/java/accord/txn/Ballot.java
rename to accord-core/src/main/java/accord/primitives/Ballot.java
index d2895210f6212a257bf2c56118d85afe31acccb5..dfc472d9f2b1b9d9aa33fae470ccce07967aa3bf 100644 (file)
@@ -1,4 +1,4 @@
-package accord.txn;
+package accord.primitives;
 
 import accord.local.Node.Id;
 
diff --git a/accord-core/src/main/java/accord/primitives/Deps.java b/accord-core/src/main/java/accord/primitives/Deps.java
new file mode 100644 (file)
index 0000000..2f87e82
--- /dev/null
@@ -0,0 +1,1137 @@
+package accord.primitives;
+
+import java.util.*;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import java.util.stream.Collectors;
+
+import accord.api.Key;
+import accord.utils.SortedArrays;
+import com.google.common.base.Preconditions;
+
+import static accord.utils.ArrayBuffers.*;
+import static accord.utils.SortedArrays.*;
+
+/**
+ * A collection of dependencies for a transaction, organised by the key the dependency is adopted via.
+ * An inverse map from TxnId to Key may also be constructed and stored in this collection.
+ */
+public class Deps implements Iterable<Map.Entry<Key, TxnId>>
+{
+    private static final boolean DEBUG_CHECKS = true;
+
+    private static final TxnId[] NO_TXNIDS = new TxnId[0];
+    private static final int[] NO_INTS = new int[0];
+    public static final Deps NONE = new Deps(Keys.EMPTY, NO_TXNIDS, NO_INTS);
+
+    public static Deps none(Keys keys)
+    {
+        int[] keysToTxnId = new int[keys.size()];
+        Arrays.fill(keysToTxnId, keys.size());
+        return new Deps(keys, NO_TXNIDS, keysToTxnId);
+    }
+
+    /**
+     * Expects Command to be provided in TxnId order
+     */
+    public static OrderedBuilder orderedBuilder(boolean hasOrderedTxnId)
+    {
+        return new OrderedBuilder(hasOrderedTxnId);
+    }
+
+    public static class OrderedBuilder implements AutoCloseable
+    {
+        final boolean hasOrderedTxnId;
+        Key[] keys;
+        int[] keyLimits;
+        // txnId -> Offset
+        TxnId[] keyToTxnId;
+        int keyCount;
+        int keyOffset;
+        int totalCount;
+
+        public OrderedBuilder(boolean hasOrderedTxnId)
+        {
+            this.keys = cachedKeys().get(16);
+            this.keyLimits = cachedInts().getInts(keys.length);
+            this.hasOrderedTxnId = hasOrderedTxnId;
+            this.keyToTxnId = cachedTxnIds().get(16);
+        }
+
+        public boolean isEmpty()
+        {
+            return totalCount() == 0;
+        }
+
+        private int totalCount()
+        {
+            return totalCount;
+        }
+
+        public void nextKey(Key key)
+        {
+            if (keyCount > 0 && keys[keyCount - 1].compareTo(key) >= 0)
+            {
+                throw new IllegalArgumentException("Key " + key + " has already been visited or was provided out of order ("
+                        + Arrays.toString(Arrays.copyOf(keys, keyCount)) + ")");
+            }
+
+            finishKey();
+
+            if (keyCount == keys.length)
+            {
+                Key[] newKeys = cachedKeys().get(keyCount * 2);
+                System.arraycopy(keys, 0, newKeys, 0, keyCount);
+                cachedKeys().forceDiscard(keys, keyCount);
+                keys = newKeys;
+                int[] newKeyLimits = cachedInts().getInts(keyCount * 2);
+                System.arraycopy(keyLimits, 0, newKeyLimits, 0, keyCount);
+                cachedInts().forceDiscard(keyLimits);
+                keyLimits = newKeyLimits;
+            }
+            keys[keyCount++] = key;
+        }
+
+        private void finishKey()
+        {
+            if (totalCount == keyOffset && keyCount > 0)
+                --keyCount; // remove this key; no data
+
+            if (keyCount == 0)
+                return;
+
+            if (totalCount != keyOffset && !hasOrderedTxnId)
+            {
+                Arrays.sort(keyToTxnId, keyOffset, totalCount);
+                for (int i = keyOffset + 1 ; i < totalCount ; ++i)
+                {
+                    if (keyToTxnId[i - 1].equals(keyToTxnId[i]))
+                        throw new IllegalArgumentException("TxnId for " + keys[keyCount - 1] + " are not unique: " + Arrays.asList(keyToTxnId).subList(keyOffset, totalCount));
+                }
+            }
+
+            keyLimits[keyCount - 1] = totalCount;
+            keyOffset = totalCount;
+        }
+
+        public void add(Key key, TxnId txnId)
+        {
+            if (keyCount == 0 || !keys[keyCount - 1].equals(key))
+                nextKey(key);
+            add(txnId);
+        }
+
+        /**
+         * Add this command as a dependency for each intersecting key
+         */
+        public void add(TxnId txnId)
+        {
+            if (hasOrderedTxnId && totalCount > keyOffset && keyToTxnId[totalCount - 1].compareTo(txnId) >= 0)
+                throw new IllegalArgumentException("TxnId provided out of order");
+
+            if (totalCount >= keyToTxnId.length)
+            {
+                TxnId[] newTxnIds = cachedTxnIds().get(keyToTxnId.length * 2);
+                System.arraycopy(keyToTxnId, 0, newTxnIds, 0, totalCount);
+                cachedTxnIds().forceDiscard(keyToTxnId, totalCount);
+                keyToTxnId = newTxnIds;
+            }
+
+            keyToTxnId[totalCount++] = txnId;
+        }
+
+        public Deps build()
+        {
+            if (totalCount == 0)
+                return NONE;
+
+            finishKey();
+
+            TxnId[] uniqueTxnId = cachedTxnIds().get(totalCount);
+            System.arraycopy(keyToTxnId, 0, uniqueTxnId, 0, totalCount);
+            Arrays.sort(uniqueTxnId, 0, totalCount);
+            int txnIdCount = 1;
+            for (int i = 1 ; i < totalCount ; ++i)
+            {
+                if (!uniqueTxnId[txnIdCount - 1].equals(uniqueTxnId[i]))
+                    uniqueTxnId[txnIdCount++] = uniqueTxnId[i];
+            }
+
+            TxnId[] txnIds = cachedTxnIds().complete(uniqueTxnId, txnIdCount);
+            cachedTxnIds().discard(uniqueTxnId, totalCount);
+
+            int[] result = new int[keyCount + totalCount];
+            int offset = keyCount;
+            for (int k = 0 ; k < keyCount ; ++k)
+            {
+                result[k] = keyCount + keyLimits[k];
+                int from = k == 0 ? 0 : keyLimits[k - 1];
+                int to = keyLimits[k];
+                offset = (int)SortedArrays.foldlIntersection(txnIds, 0, txnIdCount, keyToTxnId, from, to, (li, ri, key, p, v) -> {
+                    result[(int)v] = li;
+                    return v + 1;
+                }, keyCount, offset, -1);
+            }
+
+            return new Deps(Keys.ofSortedUnchecked(cachedKeys().complete(keys, keyCount)), txnIds, result);
+        }
+
+        @Override
+        public void close()
+        {
+            cachedKeys().discard(keys, keyCount);
+            cachedInts().forceDiscard(keyLimits);
+            cachedTxnIds().forceDiscard(keyToTxnId, totalCount);
+        }
+    }
+
+    /**
+     * An object for managing a sequence of efficient linear merges Deps objects.
+     * Its primary purpose is to manage input and output buffers, so that we reuse output buffers
+     * as input to the next merge, and if any input is a superset of the other inputs that this input
+     * is returned unmodified.
+     *
+     * This is achieved by using PassThroughXBuffers so that the result buffers (and their sizes) are returned
+     * unmodified, and the buffers are cached as far as possible. In general, the buffers should be taken
+     * out of pre-existing caches, but if the buffers are too large then we cache any additional buffers we
+     * allocate for the duration of the merge.
+     */
+    private static class LinearMerger extends PassThroughObjectAndIntBuffers<TxnId> implements DepsConstructor<Key, TxnId, Object>
+    {
+        final PassThroughObjectBuffers<Key> keyBuffers;
+        Key[] bufKeys;
+        TxnId[] bufTxnIds;
+        int[] buf = null;
+        int bufKeysLength, bufTxnIdsLength = 0, bufLength = 0;
+        Deps from = null;
+
+        LinearMerger()
+        {
+            super(cachedTxnIds(), cachedInts());
+            keyBuffers = new PassThroughObjectBuffers<>(cachedKeys());
+        }
+
+        @Override
+        public Object construct(Key[] keys, int keysLength, TxnId[] txnIds, int txnIdsLength, int[] out, int outLength)
+        {
+            if (from == null)
+            {
+                // if our input buffers were themselves buffers, we want to discard them unless they have been returned back to us
+                discard(keys, txnIds, out);
+            }
+            else if (buf != out)
+            {
+                // the output is not equal to a prior input
+                from = null;
+            }
+
+            if (from == null)
+            {
+                bufKeys = keys;
+                bufKeysLength = keysLength;
+                bufTxnIds = txnIds;
+                bufTxnIdsLength = txnIdsLength;
+                buf = out;
+                bufLength = outLength;
+            }
+            else
+            {
+                Preconditions.checkState(keys == bufKeys && keysLength == bufKeysLength);
+                Preconditions.checkState(txnIds == bufTxnIds && txnIdsLength == bufTxnIdsLength);
+                Preconditions.checkState(outLength == bufLength);
+            }
+            return null;
+        }
+
+        void update(Deps deps)
+        {
+            if (buf == null)
+            {
+                bufKeys = deps.keys.keys;
+                bufKeysLength = deps.keys.keys.length;
+                bufTxnIds = deps.txnIds;
+                bufTxnIdsLength = deps.txnIds.length;
+                buf = deps.keyToTxnId;
+                bufLength = deps.keyToTxnId.length;
+                from = deps;
+                return;
+            }
+
+            linearUnion(
+                    bufKeys, bufKeysLength, bufTxnIds, bufTxnIdsLength, buf, bufLength,
+                    deps.keys.keys, deps.keys.keys.length, deps.txnIds, deps.txnIds.length, deps.keyToTxnId, deps.keyToTxnId.length,
+                    keyBuffers, this, this, this
+            );
+            if (buf == deps.keyToTxnId)
+            {
+                Preconditions.checkState(deps.keys.keys == bufKeys && deps.keys.keys.length == bufKeysLength);
+                Preconditions.checkState(deps.txnIds == bufTxnIds && deps.txnIds.length == bufTxnIdsLength);
+                Preconditions.checkState(deps.keyToTxnId.length == bufLength);
+                from = deps;
+            }
+        }
+
+        Deps get()
+        {
+            if (buf == null)
+                return NONE;
+
+            if (from != null)
+                return from;
+
+            return new Deps(
+                    Keys.ofSortedUnchecked(keyBuffers.realComplete(bufKeys, bufKeysLength)),
+                    realComplete(bufTxnIds, bufTxnIdsLength),
+                    realComplete(buf, bufLength));
+        }
+
+        /**
+         * Free any buffers we no longer need
+         */
+        void discard()
+        {
+            if (from == null)
+                discard(null, null, null);
+        }
+
+        /**
+         * Free buffers unless they are equal to the corresponding parameter
+         */
+        void discard(Key[] freeKeysIfNot, TxnId[] freeTxnIdsIfNot, int[] freeBufIfNot)
+        {
+            if (from != null)
+                return;
+
+            if (bufKeys != freeKeysIfNot)
+            {
+                keyBuffers.realDiscard(bufKeys, bufKeysLength);
+                bufKeys = null;
+            }
+            if (bufTxnIds != freeTxnIdsIfNot)
+            {
+                realDiscard(bufTxnIds, bufTxnIdsLength);
+                bufTxnIds = null;
+            }
+            if (buf != freeBufIfNot)
+            {
+                realDiscard(buf, bufLength);
+                buf = null;
+            }
+        }
+    }
+
+    public static <T> Deps merge(List<T> merge, Function<T, Deps> getter)
+    {
+        LinearMerger linearMerger = new LinearMerger();
+        try
+        {
+            int mergeIndex = 0, mergeSize = merge.size();
+            while (mergeIndex < mergeSize)
+            {
+                Deps deps = getter.apply(merge.get(mergeIndex++));
+                if (deps == null || deps.isEmpty())
+                    continue;
+
+                linearMerger.update(deps);
+            }
+
+            return linearMerger.get();
+        }
+        finally
+        {
+            linearMerger.discard();
+        }
+    }
+
+    final Keys keys; // unique Keys
+    final TxnId[] txnIds; // unique TxnId TODO: this should perhaps be a BTree?
+
+    /**
+     * This represents a map of {@code Key -> [TxnId] } where each TxnId is actually a pointer into the txnIds array.
+     * The beginning of the array (the first keys.size() entries) are offsets into this array.
+     * <p/>
+     * Example:
+     * <p/>
+     * {@code
+     *   int keyIdx = keys.indexOf(key);
+     *   int startOfTxnOffset = keyIdx == 0 ? keys.size() : keyToTxnId[keyIdx - 1];
+     *   int endOfTxnOffset = keyToTxnId[keyIdx];
+     *   for (int i = startOfTxnOffset; i < endOfTxnOffset; i++)
+     *   {
+     *       TxnId id = txnIds[keyToTxnId[i]]
+     *       ...
+     *   }
+     * }
+     */
+    final int[] keyToTxnId; // Key -> [TxnId]
+    // Lazy loaded in ensureTxnIdToKey()
+    int[] txnIdToKey; // TxnId -> [Key]
+
+    Deps(Keys keys, TxnId[] txnIds, int[] keyToTxnId)
+    {
+        this.keys = keys;
+        this.txnIds = txnIds;
+        this.keyToTxnId = keyToTxnId;
+        if (!(keys.isEmpty() || keyToTxnId[keys.size() - 1] == keyToTxnId.length))
+            throw new IllegalArgumentException(String.format("Last key (%s) in keyToTxnId does not point (%d) to the end of the array (%d);\nkeyToTxnId=%s", keys.get(keys.size() - 1), keyToTxnId[keys.size() - 1], keyToTxnId.length, Arrays.toString(keyToTxnId)));
+        if (DEBUG_CHECKS)
+            checkValid();
+    }
+
+    public Deps slice(KeyRanges ranges)
+    {
+        if (isEmpty())
+            return this;
+
+        Keys select = keys.slice(ranges);
+
+        if (select.size() == keys.size())
+            return this;
+
+        return selectInternal(select);
+    }
+
+    /**
+     * @param select may include keys not present in this {@code Deps}
+     * @return those parts of this Deps relating to the provided keys
+     */
+    private Deps selectInternal(Keys select)
+    {
+        if (select.isEmpty())
+            return NONE;
+
+        int i = 0;
+        int offset = select.size();
+        for (int j = 0 ; j < select.size() ; ++j)
+        {
+            int findi = keys.findNext(select.get(j), i);
+            if (findi < 0)
+                continue;
+
+            i = findi;
+            offset += keyToTxnId[i] - (i == 0 ? keys.size() : keyToTxnId[i - 1]);
+        }
+
+        int[] src = keyToTxnId;
+        int[] trg = new int[offset];
+
+        i = 0;
+        offset = select.size();
+        for (int j = 0 ; j < select.size() ; ++j)
+        {
+            int findi = keys.findNext(select.get(j), i);
+            if (findi >= 0)
+            {
+                i = findi;
+                int start = i == 0 ? keys.size() : src[i - 1];
+                int count = src[i] - start;
+                System.arraycopy(src, start, trg, offset, count);
+                offset += count;
+            }
+            trg[j] = offset;
+        }
+
+        TxnId[] txnIds = trimUnusedTxnId(select, this.txnIds, trg);
+        return new Deps(select, txnIds, trg);
+    }
+
+    /**
+     * Returns the set of {@link TxnId}s that are referenced by {@code keysToTxnId}, and <strong>updates</strong>
+     * {@code keysToTxnId} to point to the new offsets in the returned set.
+     * @param keys object referenced by {@code keysToTxnId} index
+     * @param txnIds to trim to the seen {@link TxnId}s
+     * @param keysToTxnId to use as reference for trimming, this index will be updated to reflect the trimmed offsets.
+     * @return smallest set of {@link TxnId} seen in {@code keysToTxnId}
+     */
+    private static TxnId[] trimUnusedTxnId(Keys keys, TxnId[] txnIds, int[] keysToTxnId)
+    {
+        int[] remapTxnId = new int[txnIds.length];
+        // on init all values got set to 0, so use 1 to define that the id exists in the index
+        for (int i = keys.size() ; i < keysToTxnId.length ; ++i)
+            remapTxnId[keysToTxnId[i]] = 1;
+
+        int offset = 0;
+        for (int i = 0 ; i < remapTxnId.length ; ++i)
+        {
+            if (remapTxnId[i] == 1) remapTxnId[i] = offset++;
+            else remapTxnId[i] = -1;
+        }
+
+        TxnId[] result = txnIds;
+        if (offset < remapTxnId.length)
+        {
+            result = new TxnId[offset];
+            for (int i = 0 ; i < txnIds.length ; ++i)
+            {
+                if (remapTxnId[i]>= 0)
+                    result[remapTxnId[i]] = txnIds[i];
+            }
+            // Update keysToTxnId to point to the new remapped TxnId offsets
+            for (int i = keys.size() ; i < keysToTxnId.length ; ++i)
+                keysToTxnId[i] = remapTxnId[keysToTxnId[i]];
+        }
+
+        return result;
+    }
+
+    public Deps with(Deps that)
+    {
+        if (isEmpty() || that.isEmpty())
+            return isEmpty() ? that : this;
+
+        return linearUnion(
+                this.keys.keys, this.keys.keys.length, this.txnIds, this.txnIds.length, this.keyToTxnId, this.keyToTxnId.length,
+                that.keys.keys, that.keys.keys.length, that.txnIds, that.txnIds.length, that.keyToTxnId, that.keyToTxnId.length,
+                cachedKeys(), cachedTxnIds(), cachedInts(),
+                (keys, keysLength, txnIds, txnIdsLength, out, outLength) ->
+                        new Deps(Keys.ofSortedUnchecked(cachedKeys().complete(keys, keysLength)),
+                                cachedTxnIds().complete(txnIds, txnIdsLength),
+                                cachedInts().complete(out, outLength))
+                );
+    }
+
+    /**
+     * Turn a set of key, value and mapping buffers into a merge result;
+     * K and V are either Key and TxnId, or vice versa, depending on which mapping direction was present
+     */
+    interface DepsConstructor<K, V, T>
+    {
+        T construct(K[] keys, int keysLength, V[] values, int valuesLength, int[] out, int outLength);
+    }
+
+    // TODO: this method supports merging keyToTxnId OR txnIdToKey; we can perhaps save time and effort when constructing
+    //       Deps on remote hosts by only producing txnIdToKey with OrderedCollector and serializing only this,
+    //       and merging on the recipient before inverting, so that we only have to invert the final assembled deps
+    private static <K extends Comparable<? super K>, V extends Comparable<? super V>, T>
+    T linearUnion(K[] leftKeys, int leftKeysLength, V[] leftValues, int leftValuesLength, int[] left, int leftLength,
+                  K[] rightKeys, int rightKeysLength, V[] rightValues, int rightValuesLength, int[] right, int rightLength,
+                  ObjectBuffers<K> keyBuffers, ObjectBuffers<V> valueBuffers, IntBuffers intBuffers, DepsConstructor<K, V, T> constructor)
+    {
+        K[] outKeys = null;
+        V[] outValues = null;
+        int[] remapLeft = null, remapRight = null, out = null;
+        int outLength = 0, outKeysLength = 0, outTxnIdsLength = 0;
+
+        try
+        {
+            // TODO: this is a little clunky for getting back the buffer and its length
+            outKeys = SortedArrays.linearUnion(leftKeys, leftKeysLength, rightKeys, rightKeysLength, keyBuffers);
+            outKeysLength = keyBuffers.lengthOfLast(outKeys);
+            outValues = SortedArrays.linearUnion(leftValues, leftValuesLength, rightValues, rightValuesLength, valueBuffers);
+            outTxnIdsLength = valueBuffers.lengthOfLast(outValues);
+
+            remapLeft = remapToSuperset(leftValues, leftValuesLength, outValues, outTxnIdsLength, intBuffers);
+            remapRight = remapToSuperset(rightValues, rightValuesLength, outValues, outTxnIdsLength, intBuffers);
+
+            if (remapLeft == null && remapRight == null && leftKeysLength == rightKeysLength
+                    && Arrays.equals(left, 0, leftLength, right, 0, rightLength)
+                    && Arrays.equals(leftKeys, 0, leftKeysLength, rightKeys, 0, rightKeysLength)
+                )
+            {
+                return constructor.construct(leftKeys, leftKeysLength, leftValues, leftValuesLength, left, leftLength);
+            }
+
+            int lk = 0, rk = 0, ok = 0, l = leftKeysLength, r = rightKeysLength;
+            outLength = outKeysLength;
+
+            if (remapLeft == null && outKeys == leftKeys)
+            {
+                // "this" knows all the TxnId and Keys already, but do both agree on what Keys map to TxnIds?
+                noOp: while (lk < leftKeysLength && rk < rightKeysLength)
+                {
+                    int ck = leftKeys[lk].compareTo(rightKeys[rk]);
+                    if (ck < 0)
+                    {
+                        // "this" knows of a key not present in "that"
+                        outLength += left[lk] - l; // logically append the key's TxnIds to the size
+                        l = left[lk];
+                        assert outLength == l && ok == lk && left[ok] == outLength;
+                        ok++;
+                        lk++;
+                    }
+                    else if (ck > 0)
+                    {
+                        // if this happened there is a bug with keys.union or keys are not actually sorted
+                        throwUnexpectedMissingKeyException(leftKeys, lk, leftKeysLength, rightKeys, rk, rightKeysLength, true);
+                    }
+                    else
+                    {
+                        // both "this" and "that" know of the key
+                        while (l < left[lk] && r < right[rk])
+                        {
+                            int nextLeft = left[l];
+                            int nextRight = remap(right[r], remapRight);
+
+                            if (nextLeft < nextRight)
+                            {
+                                // "this" knows of the txn that "that" didn't
+                                outLength++;
+                                l++;
+                            }
+                            else if (nextRight < nextLeft)
+                            {
+                                out = copy(left, outLength, leftLength + rightLength - r, intBuffers);
+                                break noOp;
+                            }
+                            else
+                            {
+                                outLength++;
+                                l++;
+                                r++;
+                            }
+                        }
+
+                        if (l < left[lk])
+                        {
+                            outLength += left[lk] - l;
+                            l = left[lk];
+                        }
+                        else if (r < right[rk])
+                        {
+                            // "that" thinks a key includes a TxnId as a dependency but "this" doesn't, need to include this knowledge
+                            out = copy(left, outLength, leftLength + rightLength - r, intBuffers);
+                            break;
+                        }
+
+                        assert outLength == l && ok == lk && left[ok] == outLength;
+                        ok++;
+                        rk++;
+                        lk++;
+                    }
+                }
+
+                if (out == null)
+                    return constructor.construct(leftKeys, leftKeysLength, leftValues, leftValuesLength, left, leftLength);
+            }
+            else if (remapRight == null && outKeys == rightKeys)
+            {
+                // "that" knows all the TxnId and keys already, but "this" does not
+                noOp: while (lk < leftKeysLength && rk < rightKeysLength)
+                {
+                    int ck = leftKeys[lk].compareTo(rightKeys[rk]);
+                    if (ck < 0)
+                    {
+                        // if this happened there is a bug with keys.union or keys are not actually sorted
+                        throwUnexpectedMissingKeyException(leftKeys, lk, leftKeysLength, rightKeys, rk, rightKeysLength, false);
+                    }
+                    else if (ck > 0)
+                    {
+                        outLength += right[rk] - r;
+                        r = right[rk];
+                        assert outLength == r && ok == rk && right[ok] == outLength;
+                        ok++;
+                        rk++;
+                    }
+                    else
+                    {
+                        // both "this" and "that" know of the key
+                        while (l < left[lk] && r < right[rk])
+                        {
+                            int nextLeft = remap(left[l], remapLeft);
+                            int nextRight = right[r];
+
+                            if (nextLeft < nextRight)
+                            {
+                                // "this" thinks a TxnID depends on Key but "that" doesn't, need to include this knowledge
+                                out = copy(right, outLength, rightLength + leftLength - l, intBuffers);
+                                break noOp;
+                            }
+                            else if (nextRight < nextLeft)
+                            {
+                                // "that" knows of the txn that "this" didn't
+                                outLength++;
+                                r++;
+                            }
+                            else
+                            {
+                                outLength++;
+                                l++;
+                                r++;
+                            }
+                        }
+
+                        if (l < left[lk])
+                        {
+                            out = copy(right, outLength, rightLength + leftLength - l, intBuffers);
+                            break;
+                        }
+                        else if (r < right[rk])
+                        {
+                            outLength += right[rk] - r;
+                            r = right[rk];
+                        }
+
+                        assert outLength == r && ok == rk && right[ok] == outLength;
+                        ok++;
+                        rk++;
+                        lk++;
+                    }
+                }
+
+                if (out == null)
+                    return constructor.construct(rightKeys, rightKeysLength, rightValues, rightValuesLength, right, rightLength);
+            }
+            else
+            {
+                out = intBuffers.getInts(leftLength + rightLength);
+            }
+
+            while (lk < leftKeysLength && rk < rightKeysLength)
+            {
+                int ck = leftKeys[lk].compareTo(rightKeys[rk]);
+                if (ck < 0)
+                {
+                    while (l < left[lk])
+                        out[outLength++] = remap(left[l++], remapLeft);
+                    out[ok++] = outLength;
+                    lk++;
+                }
+                else if (ck > 0)
+                {
+                    while (r < right[rk])
+                        out[outLength++] = remap(right[r++], remapRight);
+                    out[ok++] = outLength;
+                    rk++;
+                }
+                else
+                {
+                    while (l < left[lk] && r < right[rk])
+                    {
+                        int nextLeft = remap(left[l], remapLeft);
+                        int nextRight = remap(right[r], remapRight);
+
+                        if (nextLeft <= nextRight)
+                        {
+                            out[outLength++] = nextLeft;
+                            l += 1;
+                            r += nextLeft == nextRight ? 1 : 0;
+                        }
+                        else
+                        {
+                            out[outLength++] = nextRight;
+                            ++r;
+                        }
+                    }
+
+                    while (l < left[lk])
+                        out[outLength++] = remap(left[l++], remapLeft);
+
+                    while (r < right[rk])
+                        out[outLength++] = remap(right[r++], remapRight);
+
+                    out[ok++] = outLength;
+                    rk++;
+                    lk++;
+                }
+            }
+
+            while (lk < leftKeysLength)
+            {
+                while (l < left[lk])
+                    out[outLength++] = remap(left[l++], remapLeft);
+                out[ok++] = outLength;
+                lk++;
+            }
+
+            while (rk < rightKeysLength)
+            {
+                while (r < right[rk])
+                    out[outLength++] = remap(right[r++], remapRight);
+                out[ok++] = outLength;
+                rk++;
+            }
+
+            return constructor.construct(outKeys, outKeysLength, outValues, outTxnIdsLength, out, outLength);
+        }
+        finally
+        {
+            if (outKeys != null)
+                keyBuffers.discard(outKeys, outKeysLength);
+            if (outValues != null)
+                valueBuffers.discard(outValues, outTxnIdsLength);
+            if (out != null)
+                intBuffers.discard(out, outLength);
+            if (remapLeft != null)
+                intBuffers.forceDiscard(remapLeft);
+            if (remapRight != null)
+                intBuffers.forceDiscard(remapRight);
+        }
+    }
+
+    private static <A> void throwUnexpectedMissingKeyException(A[] leftKeys, int leftKeyIndex, int leftKeyLength, A[] rightKeys, int rightKeyIndex, int rightKeyLength, boolean isMissingLeft)
+    {
+        StringBuilder sb = new StringBuilder();
+        String missing = isMissingLeft ? "left" : "right";
+        String extra = isMissingLeft ? "right" : "left";
+        sb.append(missing).append(" knows all keys, yet ").append(extra).append(" knew of an extra key at indexes left[")
+                .append(leftKeyIndex).append("] = ").append(leftKeys[leftKeyIndex])
+                .append(", right[").append(rightKeyIndex).append("] = ").append(rightKeys[rightKeyIndex]).append("\n");
+        sb.append("leftKeys = ").append(Arrays.stream(leftKeys, 0, leftKeyLength).map(Object::toString).collect(Collectors.joining())).append('\n');
+        sb.append("rightKeys = ").append(Arrays.stream(rightKeys, 0, rightKeyLength).map(Object::toString).collect(Collectors.joining())).append('\n');
+        throw new IllegalStateException(sb.toString());
+    }
+
+    private static int[] copy(int[] src, int to, int length, IntBuffers bufferManager)
+    {
+        if (length == 0)
+            return NO_INTS;
+
+        int[] result = bufferManager.getInts(length);
+        if (result.length < length)
+            throw new IllegalStateException();
+        System.arraycopy(src, 0, result, 0, to);
+        return result;
+    }
+
+    // TODO: optimise for case where none removed
+    public Deps without(Predicate<TxnId> remove)
+    {
+        if (isEmpty())
+            return this;
+
+        int[] remapTxnIds = new int[txnIds.length];
+        TxnId[] txnIds; {
+            int count = 0;
+            for (int i = 0 ; i < this.txnIds.length ; ++i)
+            {
+                if (remove.test(this.txnIds[i])) remapTxnIds[i] = -1;
+                else remapTxnIds[i] = count++;
+            }
+
+            if (count == remapTxnIds.length)
+                return this;
+
+            if (count == 0)
+                return NONE;
+
+            txnIds = new TxnId[count];
+            for (int i = 0 ; i < this.txnIds.length ; ++i)
+            {
+                if (remapTxnIds[i] >= 0)
+                    txnIds[remapTxnIds[i]] = this.txnIds[i];
+            }
+        }
+
+        int[] keyToTxnId = new int[this.keyToTxnId.length];
+        int k = 0, i = keys.size(), o = i;
+        while (i < this.keyToTxnId.length)
+        {
+            while (this.keyToTxnId[k] == i)
+                keyToTxnId[k++] = o;
+
+            int remapped = remapTxnIds[this.keyToTxnId[i]];
+            if (remapped >= 0)
+                keyToTxnId[o++] = remapped;
+            ++i;
+        }
+
+        while (k < keys.size())
+            keyToTxnId[k++] = o;
+
+        keyToTxnId = Arrays.copyOf(keyToTxnId, o);
+
+        return new Deps(keys, txnIds, keyToTxnId);
+    }
+
+    public boolean contains(TxnId txnId)
+    {
+        return Arrays.binarySearch(txnIds, txnId) >= 0;
+    }
+
+    // return true iff we map any keys to any txnId
+    // if the mapping is empty we return false, whether or not we have any keys or txnId by themselves
+    public boolean isEmpty()
+    {
+        return keyToTxnId.length == keys.size();
+    }
+
+    public Keys someKeys(TxnId txnId)
+    {
+        int txnIdIndex = Arrays.binarySearch(txnIds, txnId);
+        if (txnIdIndex < 0)
+            return Keys.EMPTY;
+
+        ensureTxnIdToKey();
+
+        int start = txnIdIndex == 0 ? txnIds.length : txnIdToKey[txnIdIndex - 1];
+        int end = txnIdToKey[txnIdIndex];
+        if (start == end)
+            return Keys.EMPTY;
+
+        Key[] result = new Key[end - start];
+        for (int i = start ; i < end ; ++i)
+            result[i - start] = keys.get(txnIdToKey[i]);
+        return Keys.of(result);
+    }
+
+    private void ensureTxnIdToKey()
+    {
+        if (txnIdToKey != null)
+            return;
+
+        txnIdToKey = invert(keyToTxnId, keyToTxnId.length, keys.size(), txnIds.length);
+    }
+
+    private static int[] invert(int[] src, int srcLength, int srcKeyCount, int trgKeyCount)
+    {
+        int[] trg = new int[trgKeyCount + srcLength - srcKeyCount];
+
+        // first pass, count number of txnId per key
+        for (int i = srcKeyCount ; i < srcLength ; ++i)
+            trg[src[i]]++;
+
+        // turn into offsets (i.e. add txnIds.size() and then sum them)
+        trg[0] += trgKeyCount;
+        for (int i = 1; i < trgKeyCount ; ++i)
+            trg[i] += trg[i - 1];
+
+        // shuffle forwards one, so we have the start index rather than end
+        System.arraycopy(trg, 0, trg, 1, trgKeyCount - 1);
+        trg[0] = trgKeyCount;
+
+        // convert the offsets to end, and set the key at the target positions
+        int k = 0;
+        for (int i = srcKeyCount ; i < srcLength ; ++i)
+        {
+            // if at the end offset, switch to the next key
+            while (i == src[k])
+                ++k;
+
+            // find the next key offset for the TxnId and set the offset to this key
+            trg[trg[src[i]]++] = k;
+        }
+
+        return trg;
+    }
+
+    public void forEachOn(KeyRanges ranges, Predicate<Key> include, BiConsumer<Key, TxnId> forEach)
+    {
+        keys.foldl(ranges, (index, key, value) -> {
+            if (!include.test(key))
+                return null;
+
+            for (int t = index == 0 ? keys.size() : keyToTxnId[index - 1], end = keyToTxnId[index]; t < end ; ++t)
+            {
+                TxnId txnId = txnIds[keyToTxnId[t]];
+                forEach.accept(key, txnId);
+            }
+            return null;
+        }, null);
+    }
+
+    /**
+     * For each {@link TxnId} that references a key within the {@link KeyRanges}; the {@link TxnId} will be seen exactly once.
+     * @param ranges to match on
+     * @param include function to say if a key should be used or not
+     * @param forEach function to call on each unique {@link TxnId}
+     */
+    public void forEachOn(KeyRanges ranges, Predicate<Key> include, Consumer<TxnId> forEach)
+    {
+        // Find all keys within the ranges, but record existence within an int64 bitset.  Since the bitset is limited
+        // to 64, this search must be called multiple times searching for different TxnIds in txnIds; this also has
+        // the property that forEach is called in TxnId order.
+        //TODO Should TxnId order be part of the public docs or just a hidden implementation detail?  The only caller
+        // does not rely on this ordering.
+        for (int offset = 0 ; offset < txnIds.length ; offset += 64)
+        {
+            long bitset = keys.foldl(ranges, (keyIndex, key, off, value) -> {
+                if (!include.test(key))
+                    return value;
+
+                int index = keyIndex == 0 ? keys.size() : keyToTxnId[keyIndex - 1];
+                int end = keyToTxnId[keyIndex];
+                if (off > 0)
+                {
+                    // TODO: interpolation search probably great here
+                    index = Arrays.binarySearch(keyToTxnId, index, end, (int)off);
+                    if (index < 0)
+                        index = -1 - index;
+                }
+
+                while (index < end)
+                {
+                    long next = keyToTxnId[index++] - off;
+                    if (next >= 64)
+                        break;
+                    value |= 1L << next;
+                }
+
+                return value;
+            }, offset, 0, -1L);
+
+            while (bitset != 0)
+            {
+                int i = Long.numberOfTrailingZeros(bitset);
+                TxnId txnId = txnIds[offset + i];
+                forEach.accept(txnId);
+                bitset ^= Long.lowestOneBit(bitset);
+            }
+        }
+    }
+
+    public void forEach(Key key, Consumer<TxnId> forEach)
+    {
+        int keyIndex = keys.indexOf(key);
+        if (keyIndex < 0)
+            return;
+
+        int index = keyIndex == 0 ? keys.size() : keyToTxnId[keyIndex - 1];
+        int end = keyToTxnId[keyIndex];
+        while (index < end)
+            forEach.accept(txnIds[keyToTxnId[index++]]);
+    }
+
+    public Keys keys()
+    {
+        return keys;
+    }
+
+    public int txnIdCount()
+    {
+        return txnIds.length;
+    }
+
+    public int totalCount()
+    {
+        return keyToTxnId.length - keys.size();
+    }
+
+    public TxnId txnId(int i)
+    {
+        return txnIds[i];
+    }
+
+    public Collection<TxnId> txnIds()
+    {
+        return List.of(txnIds);
+    }
+
+    @Override
+    public Iterator<Map.Entry<Key, TxnId>> iterator()
+    {
+        return new Iterator<>()
+        {
+            int i = keys.size(), k = 0;
+
+            @Override
+            public boolean hasNext()
+            {
+                return i < keyToTxnId.length;
+            }
+
+            @Override
+            public Map.Entry<Key, TxnId> next()
+            {
+                Entry result = new Entry(keys.get(k), txnIds[keyToTxnId[i++]]);
+                if (i == keyToTxnId[k])
+                    ++k;
+                return result;
+            }
+        };
+    }
+
+    @Override
+    public String toString()
+    {
+        if (keys.isEmpty())
+            return "{}";
+
+        StringBuilder builder = new StringBuilder("{");
+        for (int k = 0, t = keys.size(); k < keys.size() ; ++k)
+        {
+            if (builder.length() > 1)
+                builder.append(", ");
+
+            builder.append(keys.get(k));
+            builder.append(":[");
+            boolean first = true;
+            while (t < keyToTxnId[k])
+            {
+                if (first) first = false;
+                else builder.append(", ");
+                builder.append(txnIds[keyToTxnId[t++]]);
+            }
+            builder.append("]");
+        }
+        builder.append("}");
+        return builder.toString();
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        return equals((Deps) o);
+    }
+
+    public boolean equals(Deps that)
+    {
+        return this.txnIds.length == that.txnIds.length
+               && this.keys.size() == that.keys.size()
+               && Arrays.equals(this.keyToTxnId, that.keyToTxnId)
+               && Arrays.equals(this.txnIds, that.txnIds)
+               && this.keys.equals(that.keys);
+    }
+
+    public static class Entry implements Map.Entry<Key, TxnId>
+    {
+        final Key key;
+        final TxnId txnId;
+
+        public Entry(Key key, TxnId txnId)
+        {
+            this.key = key;
+            this.txnId = txnId;
+        }
+
+        @Override
+        public Key getKey()
+        {
+            return key;
+        }
+
+        @Override
+        public TxnId getValue()
+        {
+            return txnId;
+        }
+
+        @Override
+        public TxnId setValue(TxnId value)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public String toString()
+        {
+            return key + "->" + txnId;
+        }
+    }
+
+    private void checkValid()
+    {
+        int k = 0;
+        for (int i = keys.size() ; i < keyToTxnId.length ; ++i)
+        {
+            boolean first = true;
+            while (i < keyToTxnId[k])
+            {
+                if (first) first = false;
+                else if (keyToTxnId[i - 1] == keyToTxnId[i])
+                {
+                    Key key = keys.get(i);
+                    TxnId txnId = txnIds[keyToTxnId[i]];
+                    throw new IllegalStateException(String.format("Duplicate TxnId (%s) found for key %s", txnId, key));
+                }
+                i++;
+            }
+            ++k;
+        }
+    }
+
+}
similarity index 51%
rename from accord-core/src/main/java/accord/topology/KeyRange.java
rename to accord-core/src/main/java/accord/primitives/KeyRange.java
index a3aabef7335bb8a520a759a4e07c1fc68e9b893a..422c38ee820613cdd55b1631686e97234d7a40fb 100644 (file)
@@ -1,31 +1,33 @@
-package accord.topology;
+package accord.primitives;
 
 import accord.api.Key;
-import accord.txn.Keys;
+
+import accord.utils.SortedArrays;
 import com.google.common.base.Preconditions;
 
 import java.util.Objects;
 
+import static accord.utils.SortedArrays.Search.*;
+
 /**
  * A range of keys
- * @param <K>
  */
-public abstract class KeyRange<K extends Key<K>>
+public abstract class KeyRange implements Comparable<Key>
 {
-    public static abstract class EndInclusive<K extends Key<K>> extends KeyRange<K>
+    public static class EndInclusive extends KeyRange
     {
-        public EndInclusive(K start, K end)
+        public EndInclusive(Key start, Key end)
         {
             super(start, end);
         }
 
         @Override
-        public int compareKey(K key)
+        public int compareTo(Key key)
         {
             if (key.compareTo(start()) <= 0)
-                return -1;
-            if (key.compareTo(end()) > 0)
                 return 1;
+            if (key.compareTo(end()) > 0)
+                return -1;
             return 0;
         }
 
@@ -42,26 +44,32 @@ public abstract class KeyRange<K extends Key<K>>
         }
 
         @Override
-        public KeyRange<K> tryMerge(KeyRange<K> that)
+        public KeyRange subRange(Key start, Key end)
+        {
+            return new EndInclusive(start, end);
+        }
+
+        @Override
+        public KeyRange tryMerge(KeyRange that)
         {
             return KeyRange.tryMergeExclusiveInclusive(this, that);
         }
     }
 
-    public static abstract class StartInclusive<K extends Key<K>> extends KeyRange<K>
+    public static class StartInclusive extends KeyRange
     {
-        public StartInclusive(K start, K end)
+        public StartInclusive(Key start, Key end)
         {
             super(start, end);
         }
 
         @Override
-        public int compareKey(K key)
+        public int compareTo(Key key)
         {
             if (key.compareTo(start()) < 0)
-                return -1;
-            if (key.compareTo(end()) >= 0)
                 return 1;
+            if (key.compareTo(end()) >= 0)
+                return -1;
             return 0;
         }
 
@@ -78,13 +86,75 @@ public abstract class KeyRange<K extends Key<K>>
         }
 
         @Override
-        public KeyRange<K> tryMerge(KeyRange<K> that)
+        public KeyRange subRange(Key start, Key end)
+        {
+            return new StartInclusive(start, end);
+        }
+
+        @Override
+        public KeyRange tryMerge(KeyRange that)
         {
             return KeyRange.tryMergeExclusiveInclusive(this, that);
         }
     }
 
-    private static <K extends Key<K>> KeyRange<K> tryMergeExclusiveInclusive(KeyRange<K> left, KeyRange<K> right)
+    public static KeyRange range(Key start, Key end, boolean startInclusive, boolean endInclusive)
+    {
+        return new KeyRange(start, end) {
+
+            @Override
+            public boolean startInclusive()
+            {
+                return startInclusive;
+            }
+
+            @Override
+            public boolean endInclusive()
+            {
+                return endInclusive;
+            }
+
+            @Override
+            public KeyRange tryMerge(KeyRange that)
+            {
+                return KeyRange.tryMergeExclusiveInclusive(this, that);
+            }
+
+            @Override
+            public KeyRange subRange(Key start, Key end)
+            {
+                throw new UnsupportedOperationException("subRange");
+            }
+
+            @Override
+            public int compareTo(Key key)
+            {
+                if (startInclusive)
+                {
+                    if (key.compareTo(start()) < 0)
+                        return 1;
+                }
+                else
+                {
+                    if (key.compareTo(start()) <= 0)
+                        return 1;
+                }
+                if (endInclusive)
+                {
+                    if (key.compareTo(end()) > 0)
+                        return -1;
+                }
+                else
+                {
+                    if (key.compareTo(end()) >= 0)
+                        return -1;
+                }
+                return 0;
+            }
+        };
+    }
+
+    private static KeyRange tryMergeExclusiveInclusive(KeyRange left, KeyRange right)
     {
         if (left.getClass() != right.getClass())
             return null;
@@ -106,22 +176,25 @@ public abstract class KeyRange<K extends Key<K>>
         return null;
     }
 
-    private final K start;
-    private final K end;
+    private final Key start;
+    private final Key end;
 
-    private KeyRange(K start, K end)
+    private KeyRange(Key start, Key end)
     {
-        Preconditions.checkArgument(start.compareTo(end) < 0);
+        if (start.compareTo(end) >= 0)
+            throw new IllegalArgumentException(start + " >= " + end);
+        if (startInclusive() == endInclusive())
+            throw new IllegalStateException("KeyRange must have one side inclusive, and the other exclusive. KeyRange of different types should not be mixed.");
         this.start = start;
         this.end = end;
     }
 
-    public final K start()
+    public final Key start()
     {
         return start;
     }
 
-    public final K end()
+    public final Key end()
     {
         return end;
     }
@@ -134,16 +207,16 @@ public abstract class KeyRange<K extends Key<K>>
      * Return a new range covering this and the given range if the ranges are intersecting or touching. That is,
      * no keys can exist between the touching ends of the range.
      */
-    public abstract KeyRange<K> tryMerge(KeyRange<K> that);
+    public abstract KeyRange tryMerge(KeyRange that);
 
-    public abstract KeyRange<K> subRange(K start, K end);
+    public abstract KeyRange subRange(Key start, Key end);
 
     @Override
     public boolean equals(Object o)
     {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
-        KeyRange<?> that = (KeyRange<?>) o;
+        KeyRange that = (KeyRange) o;
         return Objects.equals(start, that.start) && Objects.equals(end, that.end);
     }
 
@@ -163,20 +236,31 @@ public abstract class KeyRange<K extends Key<K>>
      * Returns a negative integer, zero, or a positive integer as the provided key is less than, contained by,
      * or greater than this range.
      */
-    public abstract int compareKey(K key);
+    public int compareKey(Key key)
+    {
+        return -compareTo(key);
+    }
 
-    public boolean containsKey(K key)
+    /**
+     * Returns a negative integer, zero, or a positive integer as the provided key is greater than, contained by,
+     * or less than this range.
+     */
+    @Override
+    public abstract int compareTo(Key key);
+
+    public boolean containsKey(Key key)
     {
         return compareKey(key) == 0;
     }
 
-
     /**
      * Returns a negative integer, zero, or a positive integer if both points of the provided range are less than, the
      * range intersects this range, or both points are greater than this range
      */
-    public int compareIntersecting(KeyRange<K> that)
+    public int compareIntersecting(KeyRange that)
     {
+        if (that.getClass() != this.getClass())
+            throw new IllegalArgumentException("Cannot mix KeyRange of different types");
         if (this.start.compareTo(that.end) >= 0)
             return 1;
         if (this.end.compareTo(that.start) <= 0)
@@ -184,71 +268,54 @@ public abstract class KeyRange<K extends Key<K>>
         return 0;
     }
 
-    public boolean intersects(KeyRange<K> that)
+    public boolean intersects(KeyRange that)
     {
         return compareIntersecting(that) == 0;
     }
 
-    public boolean fullyContains(KeyRange<K> that)
+    public boolean fullyContains(KeyRange that)
     {
         return that.start.compareTo(this.start) >= 0 && that.end.compareTo(this.end) <= 0;
     }
 
     public boolean intersects(Keys keys)
     {
-        return lowKeyIndex(keys) >= 0;
+        return SortedArrays.binarySearch(keys.keys, 0, keys.size(), this, KeyRange::compareTo, FAST) >= 0;
     }
 
     /**
      * Returns a range covering the overlapping parts of this and the provided range, returns
      * null if the ranges do not overlap
      */
-    public KeyRange<K> intersection(KeyRange<K> that)
+    public KeyRange intersection(KeyRange that)
     {
         if (this.compareIntersecting(that) != 0)
             return null;
 
-        K start = this.start.compareTo(that.start) > 0 ? this.start : that.start;
-        K end = this.end.compareTo(that.end) < 0 ? this.end : that.end;
+        Key start = this.start.compareTo(that.start) > 0 ? this.start : that.start;
+        Key end = this.end.compareTo(that.end) < 0 ? this.end : that.end;
         return subRange(start, end);
     }
 
     /**
      * returns the index of the first key larger than what's covered by this range
      */
-    public int higherKeyIndex(Keys keys, int lowerBound, int upperBound)
+    public int nextHigherKeyIndex(Keys keys, int from)
     {
-        int i = keys.search(lowerBound, upperBound, this,
-                            (k, r) -> ((KeyRange) r).compareKey((Key) k) <= 0 ? -1 : 1);
+        int i = SortedArrays.exponentialSearch(keys.keys, from, keys.size(), this, KeyRange::compareTo, FLOOR);
         if (i < 0) i = -1 - i;
+        else i = i + 1;
         return i;
     }
 
-    public int higherKeyIndex(Keys keys)
-    {
-        return higherKeyIndex(keys, 0, keys.size());
-    }
-
     /**
      * returns the index of the lowest key contained in this range. If the keys object contains no intersecting
      * keys, <code>(-(<i>insertion point</i>) - 1)</code> is returned. Where <i>insertion point</i> is where an
      * intersecting key would be inserted into the keys array
      * @param keys
      */
-    public int lowKeyIndex(Keys keys, int lowerBound, int upperBound)
-    {
-        if (keys.isEmpty()) return -1;
-
-        int i = keys.search(lowerBound, upperBound, this,
-                            (k, r) -> ((KeyRange) r).compareKey((Key) k) < 0 ? -1 : 1);
-
-        int minIdx = -1 - i;
-
-        return (minIdx < keys.size() && containsKey((K) keys.get(minIdx))) ? minIdx : i;
-    }
-
-    public int lowKeyIndex(Keys keys)
+    public int nextCeilKeyIndex(Keys keys, int from)
     {
-        return lowKeyIndex(keys, 0, keys.size());
+        return SortedArrays.exponentialSearch(keys.keys, from, keys.size(), this, KeyRange::compareTo, CEIL);
     }
 }
diff --git a/accord-core/src/main/java/accord/primitives/KeyRanges.java b/accord-core/src/main/java/accord/primitives/KeyRanges.java
new file mode 100644 (file)
index 0000000..490beae
--- /dev/null
@@ -0,0 +1,453 @@
+package accord.primitives;
+
+import accord.api.Key;
+import accord.utils.SortedArrays;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+
+import java.util.*;
+
+import static accord.utils.SortedArrays.Search.FAST;
+
+public class KeyRanges implements Iterable<KeyRange>
+{
+    public static final KeyRanges EMPTY = ofSortedAndDeoverlappedUnchecked(new KeyRange[0]);
+
+    final KeyRange[] ranges;
+
+    private KeyRanges(KeyRange[] ranges)
+    {
+        Preconditions.checkNotNull(ranges);
+        this.ranges = ranges;
+    }
+
+    public KeyRanges(List<KeyRange> ranges)
+    {
+        this(ranges.toArray(KeyRange[]::new));
+    }
+
+    @Override
+    public String toString()
+    {
+        return Arrays.toString(ranges);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        KeyRanges ranges1 = (KeyRanges) o;
+        return Arrays.equals(ranges, ranges1.ranges);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Arrays.hashCode(ranges);
+    }
+
+    @Override
+    public Iterator<KeyRange> iterator()
+    {
+        return Iterators.forArray(ranges);
+    }
+
+    public int rangeIndexForKey(int lowerBound, int upperBound, Key key)
+    {
+        return SortedArrays.binarySearch(ranges, lowerBound, upperBound, key, (k, r) -> r.compareKey(k), FAST);
+    }
+
+    public int rangeIndexForKey(Key key)
+    {
+        return rangeIndexForKey(0, ranges.length, key);
+    }
+
+    public boolean contains(Key key)
+    {
+        return rangeIndexForKey(key) >= 0;
+    }
+
+    public boolean containsAll(Keys keys)
+    {
+        return keys.rangeFoldl(this, (from, to, p, v) -> v + (to - from), 0, 0, 0) == keys.size();
+    }
+
+    public int size()
+    {
+        return ranges.length;
+    }
+
+    public KeyRange get(int i)
+    {
+        return ranges[i];
+    }
+
+    public boolean isEmpty()
+    {
+        return size() == 0;
+    }
+
+    public KeyRanges select(int[] indexes)
+    {
+        KeyRange[] selection = new KeyRange[indexes.length];
+        for (int i=0; i<indexes.length; i++)
+            selection[i] = ranges[indexes[i]];
+        return ofSortedAndDeoverlapped(selection);
+    }
+
+    public boolean intersects(Keys keys)
+    {
+        return findNextIntersection(0, keys, 0) >= 0;
+    }
+
+    public boolean intersects(KeyRanges that)
+    {
+        return SortedArrays.findNextIntersection(this.ranges, 0, that.ranges, 0, KeyRange::compareIntersecting) >= 0;
+    }
+
+    public int findFirstKey(Keys keys)
+    {
+        return findNextKey(0, keys, 0);
+    }
+
+    public int findNextKey(int ri, Keys keys, int ki)
+    {
+        return (int) (findNextIntersection(ri, keys, ki) >> 32);
+    }
+
+    // returns ki in top 32 bits, ri in bottom, or -1 if no match found
+    public long findNextIntersection(int ri, Keys keys, int ki)
+    {
+        return SortedArrays.findNextIntersectionWithMultipleMatches(keys.keys, ki, ranges, ri);
+    }
+
+    public int findFirstKey(Key[] keys)
+    {
+        return findNextKey(0, keys, 0);
+    }
+
+    public int findNextKey(int ri, Key[] keys, int ki)
+    {
+        return (int) (findNextIntersection(ri, keys, ki) >> 32);
+    }
+
+    // returns ki in top 32 bits, ri in bottom, or -1 if no match found
+    public long findNextIntersection(int ri, Key[] keys, int ki)
+    {
+        return SortedArrays.findNextIntersectionWithMultipleMatches(keys, ki, ranges, ri);
+    }
+
+    /**
+     * Subtracts the given set of key ranges from this
+     * @param that
+     * @return
+     */
+    public KeyRanges difference(KeyRanges that)
+    {
+        if (that == this)
+            return KeyRanges.EMPTY;
+
+        List<KeyRange> result = new ArrayList<>(this.size() + that.size());
+        int thatIdx = 0;
+
+        for (int thisIdx=0; thisIdx<this.size(); thisIdx++)
+        {
+            KeyRange thisRange = this.ranges[thisIdx];
+            while (thatIdx < that.size())
+            {
+                KeyRange thatRange = that.ranges[thatIdx];
+
+                int cmp = thisRange.compareIntersecting(thatRange);
+                if (cmp > 0)
+                {
+                    thatIdx++;
+                    continue;
+                }
+                if (cmp < 0) break;
+
+                int scmp = thisRange.start().compareTo(thatRange.start());
+                int ecmp = thisRange.end().compareTo(thatRange.end());
+
+                if (scmp < 0)
+                    result.add(thisRange.subRange(thisRange.start(), thatRange.start()));
+
+                if (ecmp <= 0)
+                {
+                    thisRange = null;
+                    break;
+                }
+                else
+                {
+                    thisRange = thisRange.subRange(thatRange.end(), thisRange.end());
+                    thatIdx++;
+                }
+            }
+            if (thisRange != null)
+                result.add(thisRange);
+        }
+        return new KeyRanges(result.toArray(KeyRange[]::new));
+    }
+
+    /**
+     * attempts a linear merge where {@code as} is expected to be a superset of {@code bs},
+     * terminating at the first indexes where this ceases to be true
+     * @return index of {@code as} in upper 32bits, {@code bs} in lower 32bits
+     *
+     * TODO: better support for merging runs of overlapping or adjacent ranges
+     */
+    private static long supersetLinearMerge(KeyRange[] as, KeyRange[] bs)
+    {
+        int ai = 0, bi = 0;
+        out: while (ai < as.length && bi < bs.length)
+        {
+            KeyRange a = as[ai];
+            KeyRange b = bs[bi];
+
+            int c = a.compareIntersecting(b);
+            if (c < 0)
+            {
+                ai++;
+            }
+            else if (c > 0)
+            {
+                break;
+            }
+            else if (b.start().compareTo(a.start()) < 0)
+            {
+                break;
+            }
+            else if ((c = b.end().compareTo(a.end())) <= 0)
+            {
+                bi++;
+                if (c == 0) ai++;
+            }
+            else
+            {
+                // use a temporary counter, so that if we don't find a run of ranges that enforce the superset
+                // condition we exit at the start of the mismatch run (and permit it to be merged)
+                // TODO: use exponentialSearch
+                int tmpai = ai;
+                do
+                {
+                    if (++tmpai == as.length || !a.end().equals(as[tmpai].start()))
+                        break out;
+                    a = as[tmpai];
+                }
+                while (a.end().compareTo(b.end()) < 0);
+                bi++;
+                ai = tmpai;
+            }
+        }
+
+        return ((long)ai << 32) | bi;
+    }
+
+    /**
+     * @return true iff {@code that} is a subset of {@code this}
+     */
+    public boolean contains(KeyRanges that)
+    {
+        if (this.isEmpty()) return that.isEmpty();
+        if (that.isEmpty()) return true;
+
+        return ((int) supersetLinearMerge(this.ranges, that.ranges)) == that.size();
+    }
+
+    /**
+     * @return the union of {@code this} and {@code that}, returning one of the two inputs if possible
+     */
+    public KeyRanges union(KeyRanges that)
+    {
+        if (this == that) return this;
+        if (this.isEmpty()) return that;
+        if (that.isEmpty()) return this;
+
+        KeyRange[] as = this.ranges, bs = that.ranges;
+        {
+            // make sure as/ai represent the ranges that might fully contain the other
+            int c = as[0].start().compareTo(bs[0].start());
+            if (c > 0 || c == 0 && as[as.length - 1].end().compareTo(bs[bs.length - 1].end()) < 0)
+            {
+                KeyRange[] tmp = as; as = bs; bs = tmp;
+            }
+        }
+
+        int ai, bi; {
+            long tmp = supersetLinearMerge(as, bs);
+            ai = (int)(tmp >>> 32);
+            bi = (int)tmp;
+        }
+
+        if (bi == bs.length)
+            return as == this.ranges ? this : that;
+
+        KeyRange[] result = new KeyRange[as.length + (bs.length - bi)];
+        int resultCount = copyAndMergeTouching(as, 0, result, 0, ai);
+
+        while (ai < as.length && bi < bs.length)
+        {
+            KeyRange a = as[ai];
+            KeyRange b = bs[bi];
+
+            int c = a.compareIntersecting(b);
+            if (c < 0)
+            {
+                result[resultCount++] = a;
+                ai++;
+            }
+            else if (c > 0)
+            {
+                result[resultCount++] = b;
+                bi++;
+            }
+            else
+            {
+                Key start = a.start().compareTo(b.start()) <= 0 ? a.start() : b.start();
+                Key end = a.end().compareTo(b.end()) >= 0 ? a.end() : b.end();
+                ai++;
+                bi++;
+                while (ai < as.length || bi < bs.length)
+                {
+                    KeyRange min;
+                    if (ai == as.length) min = bs[bi];
+                    else if (bi == bs.length) min = a = as[ai];
+                    else min = as[ai].start().compareTo(bs[bi].start()) < 0 ? a = as[ai] : bs[bi];
+                    if (min.start().compareTo(end) > 0)
+                        break;
+                    if (min.end().compareTo(end) > 0)
+                        end = min.end();
+                    if (a == min) ai++;
+                    else bi++;
+                }
+                result[resultCount++] = a.subRange(start, end);
+            }
+        }
+
+        while (ai < as.length)
+            result[resultCount++] = as[ai++];
+
+        while (bi < bs.length)
+            result[resultCount++] = bs[bi++];
+
+        if (resultCount < result.length)
+            result = Arrays.copyOf(result, resultCount);
+
+        return new KeyRanges(result);
+    }
+
+    public KeyRanges mergeTouching()
+    {
+        if (ranges.length == 0)
+            return this;
+
+        KeyRange[] result = new KeyRange[ranges.length];
+        int count = copyAndMergeTouching(ranges, 0, result, 0, ranges.length);
+        if (count == result.length)
+            return this;
+        result = Arrays.copyOf(result, count);
+        return new KeyRanges(result);
+    }
+
+    private static int copyAndMergeTouching(KeyRange[] src, int srcPosition, KeyRange[] trg, int trgPosition, int srcCount)
+    {
+        if (srcCount == 0)
+            return 0;
+
+        int count = 0;
+        KeyRange prev = src[srcPosition];
+        Key end = prev.end();
+        for (int i = 1 ; i < srcCount ; ++i)
+        {
+            KeyRange next = src[srcPosition + i];
+            if (!end.equals(next.start()))
+            {
+                trg[trgPosition + count++] = maybeUpdateEnd(prev, end);
+                prev = next;
+            }
+            end = next.end();
+        }
+        trg[trgPosition + count++] = maybeUpdateEnd(prev, end);
+        return count;
+    }
+
+    private static KeyRange maybeUpdateEnd(KeyRange range, Key withEnd)
+    {
+        return withEnd == range.end() ? range : range.subRange(range.start(), withEnd);
+    }
+
+    public static KeyRanges of(KeyRange ... ranges)
+    {
+        if (ranges.length == 0)
+            return EMPTY;
+
+        return sortAndDeoverlap(ranges, ranges.length);
+    }
+
+    private static KeyRanges sortAndDeoverlap(KeyRange[] ranges, int count)
+    {
+        if (count == 0)
+            return EMPTY;
+
+        if (count == 1)
+        {
+            if (ranges.length == 1)
+                return new KeyRanges(ranges);
+
+            return new KeyRanges(Arrays.copyOf(ranges, count));
+        }
+
+        Arrays.sort(ranges, 0, count, Comparator.comparing(KeyRange::start));
+        KeyRange prev = ranges[0];
+        int removed = 0;
+        for (int i = 1 ; i < count ; ++i)
+        {
+            KeyRange next = ranges[i];
+            if (prev.end().compareTo(next.start()) > 0)
+            {
+                prev = prev.subRange(prev.start(), next.start());
+                if (prev.end().compareTo(next.end()) >= 0)
+                {
+                    removed++;
+                }
+                else if (removed > 0)
+                {
+                    ranges[i - removed] = prev = next.subRange(prev.end(), next.end());
+                }
+            }
+            else if (removed > 0)
+            {
+                ranges[i - removed] = prev = next;
+            }
+        }
+
+        count -= removed;
+        if (count != ranges.length)
+            ranges = Arrays.copyOf(ranges, count);
+
+        return new KeyRanges(ranges);
+    }
+
+    public static KeyRanges ofSortedAndDeoverlapped(KeyRange ... ranges)
+    {
+        for (int i = 1 ; i < ranges.length ; ++i)
+        {
+            if (ranges[i - 1].end().compareTo(ranges[i].start()) > 0)
+                throw new IllegalArgumentException(Arrays.toString(ranges) + " is not correctly sorted or deoverlapped");
+        }
+
+        return new KeyRanges(ranges);
+    }
+
+    static KeyRanges ofSortedAndDeoverlappedUnchecked(KeyRange ... ranges)
+    {
+        return new KeyRanges(ranges);
+    }
+
+    public static KeyRanges single(KeyRange range)
+    {
+        return new KeyRanges(new KeyRange[]{range});
+    }
+
+}
diff --git a/accord-core/src/main/java/accord/primitives/Keys.java b/accord-core/src/main/java/accord/primitives/Keys.java
new file mode 100644 (file)
index 0000000..302ba20
--- /dev/null
@@ -0,0 +1,318 @@
+package accord.primitives;
+
+import java.util.*;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import accord.api.Key;
+import accord.utils.*;
+import org.apache.cassandra.utils.concurrent.Inline;
+
+import static accord.utils.ArrayBuffers.cachedKeys;
+
+@SuppressWarnings("rawtypes")
+// TODO: this should probably be a BTree
+// TODO: check that foldl call-sites are inlined and optimised by HotSpot
+public class Keys implements Iterable<Key>
+{
+    public static final Keys EMPTY = new Keys(new Key[0]);
+
+    final Key[] keys;
+
+    public Keys(SortedSet<? extends Key> keys)
+    {
+        this.keys = keys.toArray(Key[]::new);
+    }
+
+    public Keys(Collection<? extends Key> keys)
+    {
+        this.keys = keys.toArray(Key[]::new);
+        Arrays.sort(this.keys);
+    }
+
+    /**
+     * Creates Keys with the sorted array.  This requires the caller knows that the keys are in fact sorted and should
+     * call {@link #of(Key[])} if it isn't known.
+     * @param keys sorted
+     */
+    private Keys(Key[] keys)
+    {
+        this.keys = keys;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        Keys keys1 = (Keys) o;
+        return Arrays.equals(keys, keys1.keys);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Arrays.hashCode(keys);
+    }
+
+    public int indexOf(Key key)
+    {
+        return Arrays.binarySearch(keys, key);
+    }
+
+    public boolean contains(Key key)
+    {
+        return indexOf(key) >= 0;
+    }
+
+    public Key get(int indexOf)
+    {
+        return keys[indexOf];
+    }
+
+    public boolean isEmpty()
+    {
+        return keys.length == 0;
+    }
+
+    public int size()
+    {
+        return keys.length;
+    }
+
+    /**
+     * return true if this keys collection contains all keys found in the given keys
+     */
+    public boolean containsAll(Keys that)
+    {
+        if (that.isEmpty())
+            return true;
+
+        return foldlIntersect(that, (li, ri, k, p, v) -> v + 1, 0, 0, 0) == that.size();
+    }
+
+    public Keys union(Keys that)
+    {
+        return wrap(SortedArrays.linearUnion(keys, that.keys, cachedKeys()), that);
+    }
+
+    public Keys intersect(Keys that)
+    {
+        return wrap(SortedArrays.linearIntersection(keys, that.keys, cachedKeys()), that);
+    }
+
+    public Keys slice(KeyRanges ranges)
+    {
+        return wrap(SortedArrays.sliceWithMultipleMatches(keys, ranges.ranges, Key[]::new, (k, r) -> -r.compareTo(k), KeyRange::compareTo));
+    }
+
+    public int findNext(Key key, int startIndex)
+    {
+        return SortedArrays.exponentialSearch(keys, startIndex, keys.length, key);
+    }
+
+    public Keys with(Key key)
+    {
+        int insertPos = Arrays.binarySearch(keys, key);
+        if (insertPos >= 0)
+            return this;
+        insertPos = -1 - insertPos;
+
+        Key[] src = keys;
+        Key[] trg = new Key[src.length + 1];
+        System.arraycopy(src, 0, trg, 0, insertPos);
+        trg[insertPos] = key;
+        System.arraycopy(src, insertPos, trg, insertPos + 1, src.length - insertPos);
+        return new Keys(trg);
+    }
+
+    public Stream<Key> stream()
+    {
+        return Stream.of(keys);
+    }
+
+    @Override
+    public Iterator<Key> iterator()
+    {
+        return new Iterator<>()
+        {
+            int i = 0;
+            @Override
+            public boolean hasNext()
+            {
+                return i < keys.length;
+            }
+
+            @Override
+            public Key next()
+            {
+                return keys[i++];
+            }
+        };
+    }
+
+    public static Keys of(Key ... keys)
+    {
+        Arrays.sort(keys);
+        return new Keys(keys);
+    }
+
+    public static Keys ofSorted(Key ... keys)
+    {
+        for (int i = 1 ; i < keys.length ; ++i)
+        {
+            if (keys[i - 1].compareTo(keys[i]) >= 0)
+                throw new IllegalArgumentException(Arrays.toString(keys) + " is not sorted");
+        }
+        return new Keys(keys);
+    }
+
+    static Keys ofSortedUnchecked(Key ... keys)
+    {
+        return new Keys(keys);
+    }
+
+    public boolean any(KeyRanges ranges, Predicate<Key> predicate)
+    {
+        return 1 == foldl(ranges, (i1, key, i2, i3) -> predicate.test(key) ? 1 : 0, 0, 0, 1);
+    }
+
+    public boolean any(IndexedPredicate<Key> predicate)
+    {
+        return 1 == foldl((i, key, p, v) -> predicate.test(i, key) ? 1 : 0, 0, 0, 1);
+    }
+
+    /**
+     * Count the number of keys matching the predicate and intersecting with the given ranges.
+     * If terminateAfter is greater than 0, the method will return once terminateAfter matches are encountered
+     */
+    @Inline
+    public <V> V foldl(KeyRanges rs, IndexedFold<Key, V> fold, V accumulator)
+    {
+        int ai = 0, ri = 0;
+        while (true)
+        {
+            long ari = rs.findNextIntersection(ri, this, ai);
+            if (ari < 0)
+                break;
+
+            ai = (int)(ari >>> 32);
+            ri = (int)ari;
+            KeyRange range = rs.get(ri);
+            int nextai = range.nextHigherKeyIndex(this, ai + 1);
+            while (ai < nextai)
+            {
+                accumulator = fold.apply(ai, get(ai), accumulator);
+                ++ai;
+            }
+        }
+
+        return accumulator;
+    }
+
+    @Inline
+    public long foldl(KeyRanges rs, IndexedFoldToLong<Key> fold, long param, long initialValue, long terminalValue)
+    {
+        int ai = 0, ri = 0;
+        done: while (true)
+        {
+            long ari = rs.findNextIntersection(ri, this, ai);
+            if (ari < 0)
+                break;
+
+            ai = (int)(ari >>> 32);
+            ri = (int)ari;
+            KeyRange range = rs.get(ri);
+            int nextai = range.nextHigherKeyIndex(this, ai + 1);
+            while (ai < nextai)
+            {
+                initialValue = fold.apply(ai, get(ai), param, initialValue);
+                if (initialValue == terminalValue)
+                    break done;
+                ++ai;
+            }
+        }
+
+        return initialValue;
+    }
+
+    /**
+     * A fold variation permitting more efficient operation over indices only, by providing ranges of matching indices
+     */
+    @Inline
+    public long rangeFoldl(KeyRanges rs, IndexedRangeFoldToLong fold, long param, long initialValue, long terminalValue)
+    {
+        int ai = 0, ri = 0;
+        while (true)
+        {
+            long ari = rs.findNextIntersection(ri, this, ai);
+            if (ari < 0)
+                break;
+
+            ai = (int)(ari >>> 32);
+            ri = (int)ari;
+            KeyRange range = rs.get(ri);
+            int nextai = range.nextHigherKeyIndex(this, ai + 1);
+            initialValue = fold.apply(ai, nextai, param, initialValue);
+            if (initialValue == terminalValue)
+                break;
+            ai = nextai;
+        }
+
+        return initialValue;
+    }
+
+    @Inline
+    public long foldl(IndexedFoldToLong<Key> fold, long param, long initialValue, long terminalValue)
+    {
+        for (int i = 0; i < keys.length; i++)
+        {
+            initialValue = fold.apply(i, keys[i], param, initialValue);
+            if (terminalValue == initialValue)
+                return initialValue;
+        }
+        return initialValue;
+    }
+
+    /**
+     * A fold variation that intersects two key sets, invoking the fold function only on those
+     * items that are members of both sets (with their corresponding indices).
+     */
+    @Inline
+    public long foldlIntersect(Keys intersect, IndexedFoldIntersectToLong<Key> fold, long param, long initialValue, long terminalValue)
+    {
+        return SortedArrays.foldlIntersection(this.keys, intersect.keys, fold, param, initialValue, terminalValue);
+    }
+
+    /**
+     * A fold variation that invokes the fold function only on those items that are members of this set
+     * and NOT the provided set.
+     */
+    @Inline
+    public long foldlDifference(Keys subtract, IndexedFoldToLong<Key> fold, long param, long initialValue, long terminalValue)
+    {
+        return SortedArrays.foldlDifference(keys, subtract.keys, fold, param, initialValue, terminalValue);
+    }
+
+    private Keys wrap(Key[] wrap, Keys that)
+    {
+        return wrap == keys ? this : wrap == that.keys ? that : new Keys(wrap);
+    }
+
+    private Keys wrap(Key[] wrap)
+    {
+        return wrap == keys ? this : new Keys(wrap);
+    }
+
+    public static Keys union(Keys as, Keys bs)
+    {
+        return as == null ? bs : bs == null ? as : as.union(bs);
+    }
+
+    @Override
+    public String toString()
+    {
+        return stream().map(Object::toString).collect(Collectors.joining(",", "[", "]"));
+    }
+}
similarity index 98%
rename from accord-core/src/main/java/accord/txn/Timestamp.java
rename to accord-core/src/main/java/accord/primitives/Timestamp.java
index ef6b6d5542c024bfa562a71ebe1a4cbef5341867..22b3607aaf6f194b5baf5e5e73bab7e882af0c01 100644 (file)
@@ -1,4 +1,4 @@
-package accord.txn;
+package accord.primitives;
 
 import accord.local.Node.Id;
 
similarity index 93%
rename from accord-core/src/main/java/accord/txn/TxnId.java
rename to accord-core/src/main/java/accord/primitives/TxnId.java
index 752a59bd90114a5e59d048f33287c66e0099d59d..021844eccca26d46235b6013b843ba59aff69ed1 100644 (file)
@@ -1,4 +1,4 @@
-package accord.txn;
+package accord.primitives;
 
 import accord.local.Node.Id;
 
diff --git a/accord-core/src/main/java/accord/topology/KeyRanges.java b/accord-core/src/main/java/accord/topology/KeyRanges.java
deleted file mode 100644 (file)
index 3f87f88..0000000
+++ /dev/null
@@ -1,358 +0,0 @@
-package accord.topology;
-
-import accord.api.Key;
-import accord.txn.Keys;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
-
-import java.util.*;
-
-public class KeyRanges implements Iterable<KeyRange>
-{
-    public static final KeyRanges EMPTY = new KeyRanges(new KeyRange[0]);
-
-    // TODO: fix raw parameterized use
-    private final KeyRange[] ranges;
-
-    public KeyRanges(KeyRange[] ranges)
-    {
-        Preconditions.checkNotNull(ranges);
-        this.ranges = ranges;
-    }
-
-    public KeyRanges(List<KeyRange> ranges)
-    {
-        this(ranges.toArray(KeyRange[]::new));
-    }
-
-    @Override
-    public String toString()
-    {
-        return Arrays.toString(ranges);
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        KeyRanges ranges1 = (KeyRanges) o;
-        return Arrays.equals(ranges, ranges1.ranges);
-    }
-
-    @Override
-    public int hashCode()
-    {
-        return Arrays.hashCode(ranges);
-    }
-
-    @Override
-    public Iterator<KeyRange> iterator()
-    {
-        return Iterators.forArray(ranges);
-    }
-
-    public int rangeIndexForKey(int lowerBound, int upperBound, Key key)
-    {
-        return Arrays.binarySearch(ranges, lowerBound, upperBound, key,
-                                   (r, k) -> -((KeyRange) r).compareKey((Key) k));
-    }
-
-    public int rangeIndexForKey(Key key)
-    {
-        return rangeIndexForKey(0, ranges.length, key);
-    }
-
-    public boolean contains(Key key)
-    {
-        return rangeIndexForKey(key) >= 0;
-    }
-
-    public int size()
-    {
-        return ranges.length;
-    }
-
-    public KeyRange get(int i)
-    {
-        return ranges[i];
-    }
-
-    public boolean isEmpty()
-    {
-        return size() == 0;
-    }
-
-    public KeyRanges select(int[] indexes)
-    {
-        KeyRange[] selection = new KeyRange[indexes.length];
-        for (int i=0; i<indexes.length; i++)
-            selection[i] = ranges[indexes[i]];
-        return new KeyRanges(selection);
-    }
-
-    public KeyRanges intersection(Keys keys)
-    {
-        List<KeyRange<?>> result = null;
-
-        int keyLB = 0;
-        int keyHB = keys.size();
-        int rangeLB = 0;
-        int rangeHB = rangeIndexForKey(keys.get(keyHB-1));
-        rangeHB = rangeHB < 0 ? -1 - rangeHB : rangeHB + 1;
-
-        for (;rangeLB<rangeHB && keyLB<keyHB;)
-        {
-            Key key = keys.get(keyLB);
-            rangeLB = rangeIndexForKey(rangeLB, size(), key);
-
-            if (rangeLB < 0)
-            {
-                rangeLB = -1 -rangeLB;
-                if (rangeLB >= rangeHB)
-                    break;
-                keyLB = ranges[rangeLB].lowKeyIndex(keys, keyLB, keyHB);
-            }
-            else
-            {
-                if (result == null)
-                    result = new ArrayList<>(Math.min(rangeHB - rangeLB, keyHB - keyLB));
-                KeyRange<?> range = ranges[rangeLB];
-                result.add(range);
-                keyLB = range.higherKeyIndex(keys, keyLB, keyHB);
-                rangeLB++;
-            }
-
-            if (keyLB < 0)
-                keyLB = -1 - keyLB;
-        }
-
-        return result != null ? new KeyRanges(result.toArray(KeyRange[]::new)) : EMPTY;
-    }
-
-    public boolean intersects(Keys keys)
-    {
-        for (int i=0; i<ranges.length; i++)
-            if (ranges[i].intersects(keys))
-                return true;
-        return false;
-    }
-
-    public boolean intersects(KeyRange range)
-    {
-        for (int i=0; i<ranges.length; i++)
-            if (ranges[i].compareIntersecting(range) == 0)
-                return true;
-        return false;
-    }
-
-    public boolean intersects(KeyRanges ranges)
-    {
-        // TODO: efficiency
-        for (KeyRange thisRange : this.ranges)
-        {
-            for (KeyRange thatRange : ranges)
-            {
-                if (thisRange.intersects(thatRange))
-                    return true;
-            }
-        }
-        return false;
-    }
-
-    public int findFirstIntersecting(Keys keys)
-    {
-        for (int i=0; i<ranges.length; i++)
-        {
-            int lowKeyIndex = ranges[i].lowKeyIndex(keys);
-            if (lowKeyIndex >= 0)
-                return lowKeyIndex;
-        }
-        return -1;
-    }
-
-    /**
-     * Subtracts the given set of key ranges from this
-     * @param that
-     * @return
-     */
-    public KeyRanges difference(KeyRanges that)
-    {
-        List<KeyRange> result = new ArrayList<>(this.size() + that.size());
-        int thatIdx = 0;
-
-        for (int thisIdx=0; thisIdx<this.size(); thisIdx++)
-        {
-            KeyRange thisRange = this.ranges[thisIdx];
-            while (thatIdx < that.size())
-            {
-                KeyRange thatRange = that.ranges[thatIdx];
-
-                int cmp = thisRange.compareIntersecting(thatRange);
-                if (cmp > 0)
-                {
-                    thatIdx++;
-                    continue;
-                }
-                if (cmp < 0) break;
-
-                int scmp = thisRange.start().compareTo(thatRange.start());
-                int ecmp = thisRange.end().compareTo(thatRange.end());
-
-                if (scmp < 0)
-                    result.add(thisRange.subRange(thisRange.start(), thatRange.start()));
-
-                if (ecmp <= 0)
-                {
-                    thisRange = null;
-                    break;
-                }
-                else
-                {
-                    thisRange = thisRange.subRange(thatRange.end(), thisRange.end());
-                    thatIdx++;
-                }
-            }
-            if (thisRange != null)
-                result.add(thisRange);
-        }
-        return new KeyRanges(result.toArray(KeyRange[]::new));
-    }
-
-    /**
-     * Adds a set of non-overlapping ranges
-     */
-    public KeyRanges combine(KeyRanges that)
-    {
-        KeyRange[] combined = new KeyRange[this.ranges.length + that.ranges.length];
-        System.arraycopy(this.ranges, 0, combined, 0, this.ranges.length);
-        System.arraycopy(that.ranges, 0, combined, this.ranges.length, that.ranges.length);
-        Arrays.sort(combined, Comparator.comparing(KeyRange::start));
-
-        for (int i=1; i<combined.length; i++)
-            Preconditions.checkArgument(combined[i].compareIntersecting(combined[i -1]) != 0);
-
-        return new KeyRanges(combined);
-    }
-
-    public KeyRanges combine(KeyRange range)
-    {
-        return combine(new KeyRanges(new KeyRange[]{ range}));
-    }
-
-    private static KeyRange tryMerge(KeyRange range1, KeyRange range2)
-    {
-        if (range1 == null || range2 == null)
-            return null;
-        return range1.tryMerge(range2);
-    }
-
-    // optimised for case where one contains the other
-    public KeyRanges union(KeyRanges that)
-    {
-        int ai = 0, bi = 0;
-        KeyRange[] as = this.ranges, bs = that.ranges;
-        if (as.length < bs.length)
-        {
-            KeyRange[] tmp = as;
-            as = bs;
-            bs = tmp;
-        }
-
-        // TODO: this doesn't correctly handle as.length == bs.length if bs > as
-        while (ai < as.length && bi < bs.length)
-        {
-            KeyRange a = as[ai];
-            KeyRange b = bs[bi];
-            int c = a.compareIntersecting(b);
-            if (c < 0) ai++;
-            else if (c > 0 || !a.fullyContains(b)) break;
-            else bi++;
-        }
-
-        if (bi == bs.length)
-            return as == this.ranges ? this : that;
-
-        KeyRange[] result = new KeyRange[as.length + (bs.length - bi)];
-        System.arraycopy(as, 0, result, 0, ai);
-        int count = ai;
-
-        while (ai < as.length && bi < bs.length)
-        {
-            KeyRange a = as[ai];
-            KeyRange b = bs[bi];
-
-            int c = a.compareIntersecting(b);
-            if (c < 0)
-            {
-                result[count++] = a;
-                ai++;
-            }
-            else if (c > 0)
-            {
-                result[count++] = b;
-                bi++;
-            }
-            else
-            {
-                KeyRange merged = a.subRange(c < 0 ? a.start() : b.start(), a.end().compareTo(b.end()) > 0 ? a.end() : b.end());
-                ai++;
-                bi++;
-                while (ai < as.length || bi < bs.length)
-                {
-                    KeyRange min;
-                    if (ai == as.length) min = bs[bi];
-                    else if (bi == bs.length) min = a = as[ai];
-                    else min = as[ai].start().compareTo(bs[bi].start()) < 0 ? a = as[ai] : bs[bi];
-                    if (min.start().compareTo(merged.end()) > 0)
-                        break;
-                    if (min.end().compareTo(merged.end()) > 0)
-                        merged = merged.subRange(merged.start(), min.end());
-                    if (a == min) ai++;
-                    else bi++;
-                }
-                result[count++] = merged;
-            }
-        }
-
-        while (ai < as.length)
-            result[count++] = as[ai++];
-
-        while (bi < bs.length)
-            result[count++] = bs[bi++];
-
-        if (count < result.length)
-            result = Arrays.copyOf(result, count);
-
-        return new KeyRanges(result);
-    }
-
-    public KeyRanges mergeTouching()
-    {
-        if (ranges.length == 0)
-            return this;
-        List<KeyRange> result = new ArrayList<>(ranges.length);
-        KeyRange current = ranges[0];
-        for (int i=1; i<ranges.length; i++)
-        {
-            KeyRange merged = current.tryMerge(ranges[i]);
-            if (merged != null)
-            {
-                current = merged;
-            }
-            else
-            {
-                result.add(current);
-                current = ranges[i];
-            }
-        }
-        result.add(current);
-        return new KeyRanges(result.toArray(KeyRange[]::new));
-    }
-
-    public static KeyRanges singleton(KeyRange range)
-    {
-        return new KeyRanges(new KeyRange[]{range});
-    }
-
-}
index 4814e73059fd52ad77eb24a2b370339f587faafc..1062d13c0a3bcaf34d2d4a7c0b06393a532aff56 100644 (file)
@@ -7,6 +7,8 @@ import java.util.Set;
 
 import accord.local.Node.Id;
 import accord.api.Key;
+import accord.primitives.KeyRange;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
index 5bf2ef7ee040c9702da4288502dd55168de4dd9b..463f6056c75281fe538fa310183e874a4acb958a 100644 (file)
@@ -4,10 +4,11 @@ import java.util.*;
 import java.util.function.IntFunction;
 import java.util.stream.IntStream;
 
-import accord.local.CommandStores;
 import accord.local.Node.Id;
 import accord.api.Key;
-import accord.txn.Keys;
+import accord.primitives.KeyRange;
+import accord.primitives.KeyRanges;
+import accord.primitives.Keys;
 import accord.utils.IndexedConsumer;
 import accord.utils.IndexedBiFunction;
 import accord.utils.IndexedIntFunction;
@@ -44,7 +45,7 @@ public class Topology extends AbstractCollection<Shard>
     public Topology(long epoch, Shard... shards)
     {
         this.epoch = epoch;
-        this.ranges = new KeyRanges(Arrays.stream(shards).map(shard -> shard.range).toArray(KeyRange[]::new));
+        this.ranges = KeyRanges.ofSortedAndDeoverlapped(Arrays.stream(shards).map(shard -> shard.range).toArray(KeyRange[]::new));
         this.shards = shards;
         this.subsetOfRanges = ranges;
         this.supersetRangeIndexes = IntStream.range(0, shards.length).toArray();
@@ -168,7 +169,7 @@ public class Topology extends AbstractCollection<Shard>
             if (predicate.test(subsetIndex, shard))
                 newSubset[count++] = supersetIndex;
             // find the first key outside this range
-            i = shard.range.higherKeyIndex(select, i, select.size());
+            i = shard.range.nextHigherKeyIndex(select, i);
         }
         if (count != newSubset.length)
             newSubset = Arrays.copyOf(newSubset, count);
@@ -202,7 +203,7 @@ public class Topology extends AbstractCollection<Shard>
             Shard shard = shards[supersetIndex];
             accumulator = function.apply(subsetIndex, shard, accumulator);
             // find the first key outside this range
-            i = shard.range.higherKeyIndex(select, i, select.size());
+            i = shard.range.nextHigherKeyIndex(select, i);
         }
         return accumulator;
     }
index 4bcf1e3740ca2f8e3210e1cb203fab75771b8cdd..0c7a840baf6f5a034cde49e35c7eedf53b3b607e 100644 (file)
@@ -7,8 +7,8 @@ import accord.local.Node;
 import accord.messages.EpochRequest;
 import accord.messages.Request;
 import accord.topology.Topologies.Single;
-import accord.txn.Keys;
-import accord.txn.Timestamp;
+import accord.primitives.Keys;
+import accord.primitives.Timestamp;
 import accord.txn.Txn;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
diff --git a/accord-core/src/main/java/accord/txn/Dependencies.java b/accord-core/src/main/java/accord/txn/Dependencies.java
deleted file mode 100644 (file)
index bc42dbc..0000000
+++ /dev/null
@@ -1,136 +0,0 @@
-package accord.txn;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.Objects;
-import java.util.TreeMap;
-
-import accord.api.Key;
-import accord.local.Command;
-import accord.local.CommandStore;
-import accord.topology.KeyRanges;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-// TODO: do not send Txn
-// TODO: implementation efficiency
-public class Dependencies implements Iterable<Entry<TxnId, Txn>>
-{
-    // TEMPORARY: this is used ONLY for SimpleProgressLog.blockingState which may not know the homeKey
-    // or be able to invalidate a transaction it cannot witness anywhere
-    public static class TxnAndHomeKey
-    {
-        public final Txn txn;
-        public final Key homeKey;
-
-        TxnAndHomeKey(Txn txn, Key homeKey)
-        {
-            this.txn = txn;
-            this.homeKey = homeKey;
-        }
-    }
-
-    // TODO: encapsulate
-    public final NavigableMap<TxnId, TxnAndHomeKey> deps;
-
-    public Dependencies()
-    {
-        this.deps = new TreeMap<>();
-    }
-
-    public Dependencies(NavigableMap<TxnId, TxnAndHomeKey> deps)
-    {
-        this.deps = deps;
-    }
-
-    public void add(Command command)
-    {
-        add(command.txnId(), command.txn(), command.homeKey());
-    }
-
-    @VisibleForTesting
-    public Dependencies add(TxnId txnId, Txn txn, Key homeKey)
-    {
-        Preconditions.checkState(homeKey != null);
-        deps.put(txnId, new TxnAndHomeKey(txn, homeKey));
-        return this;
-    }
-
-    public void addAll(Dependencies add)
-    {
-        this.deps.putAll(add.deps);
-    }
-
-    public void removeAll(Dependencies add)
-    {
-        this.deps.keySet().removeAll(add.deps.keySet());
-    }
-
-    public boolean contains(TxnId txnId)
-    {
-        return deps.containsKey(txnId);
-    }
-
-    public boolean isEmpty()
-    {
-        return deps.isEmpty();
-    }
-
-    public Txn get(TxnId txnId)
-    {
-        return deps.get(txnId).txn;
-    }
-
-    public Iterable<TxnId> on(CommandStore commandStore, Timestamp executeAt)
-    {
-        KeyRanges ranges = commandStore.ranges().since(executeAt.epoch);
-        if (ranges == null)
-            return Collections.emptyList();
-
-        return deps.entrySet()
-                .stream()
-                .filter(e -> commandStore.intersects(e.getValue().txn.keys(), ranges))
-                .map(Entry::getKey)::iterator;
-    }
-
-    public Key homeKey(TxnId txnId)
-    {
-        return deps.get(txnId).homeKey;
-    }
-
-    @Override
-    public Iterator<Entry<TxnId, Txn>> iterator()
-    {
-        return deps.entrySet().stream().map(e -> Map.entry(e.getKey(), e.getValue().txn)).iterator();
-    }
-
-    public int size()
-    {
-        return deps.size();
-    }
-
-    @Override
-    public String toString()
-    {
-        return deps.keySet().toString();
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        Dependencies entries = (Dependencies) o;
-        return deps.equals(entries.deps);
-    }
-
-    @Override
-    public int hashCode()
-    {
-        return Objects.hash(deps);
-    }
-}
diff --git a/accord-core/src/main/java/accord/txn/Keys.java b/accord-core/src/main/java/accord/txn/Keys.java
deleted file mode 100644 (file)
index b09985c..0000000
+++ /dev/null
@@ -1,456 +0,0 @@
-package accord.txn;
-
-import java.util.*;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import accord.api.Key;
-import accord.topology.KeyRange;
-import accord.topology.KeyRanges;
-
-@SuppressWarnings("rawtypes")
-public class Keys implements Iterable<Key>
-{
-    public static final Keys EMPTY = new Keys(new Key[0]);
-
-    final Key[] keys;
-
-    public Keys(SortedSet<? extends Key> keys)
-    {
-        this.keys = keys.toArray(Key[]::new);
-    }
-
-    public Keys(Collection<? extends Key> keys)
-    {
-        this.keys = keys.toArray(Key[]::new);
-        Arrays.sort(this.keys);
-    }
-
-    public Keys(Key[] keys)
-    {
-        this.keys = keys;
-        Arrays.sort(keys);
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        Keys keys1 = (Keys) o;
-        return Arrays.equals(keys, keys1.keys);
-    }
-
-    @Override
-    public int hashCode()
-    {
-        return Arrays.hashCode(keys);
-    }
-
-    public int indexOf(Key key)
-    {
-        return Arrays.binarySearch(keys, key);
-    }
-
-    public boolean contains(Key key)
-    {
-        return indexOf(key) >= 0;
-    }
-
-    public Key get(int indexOf)
-    {
-        return keys[indexOf];
-    }
-
-    public Keys select(int[] indexes)
-    {
-        Key[] selection = new Key[indexes.length];
-        for (int i = 0 ; i < indexes.length ; ++i)
-            selection[i] = keys[indexes[i]];
-        return new Keys(selection);
-    }
-
-    /**
-     * return true if this keys collection contains all keys found in the given keys
-     */
-    public boolean containsAll(Keys that)
-    {
-        if (isEmpty())
-            return that.isEmpty();
-
-        for (int thisIdx=0, thatIdx=0, thisSize=size(), thatSize=that.size();
-             thatIdx<thatSize;
-             thisIdx++, thatIdx++)
-        {
-            if (thisIdx >= thisSize)
-                return false;
-
-            Key thatKey = that.keys[thatIdx];
-            Key thisKey = this.keys[thisIdx];
-            int cmp = thisKey.compareTo(thatKey);
-
-            if (cmp == 0)
-                continue;
-
-            // if this key is greater that that key, we can't contain that key
-            if (cmp > 0)
-                return false;
-
-            // if search returns a positive index, a match was found and
-            // no further comparison is needed
-            thisIdx = Arrays.binarySearch(keys, thisIdx, thisSize, thatKey);
-            if (thisIdx < 0)
-                return false;
-        }
-
-        return true;
-    }
-
-    public Keys union(Keys that)
-    {
-        int thisIdx = 0;
-        int thatIdx = 0;
-        Key[] noOp = keys.length >= that.keys.length ? this.keys : that.keys;
-        Key[] result = noOp;
-        int resultSize = 0;
-
-        while (thisIdx < this.size() && thatIdx < that.size())
-        {
-            Key thisKey = this.keys[thisIdx];
-            Key thatKey = that.keys[thatIdx];
-            int cmp = thisKey.compareTo(thatKey);
-            Key minKey;
-            if (cmp == 0)
-            {
-                thisIdx++;
-                thatIdx++;
-                if (result == noOp)
-                    continue;
-                minKey = thisKey;
-            }
-            else if (cmp < 0)
-            {
-                thisIdx++;
-                if (result == keys)
-                    continue;
-                minKey = thisKey;
-                if (result == noOp)
-                {
-                    resultSize = thatIdx;
-                    result = new Key[resultSize + (keys.length - (thisIdx - 1)) + (that.keys.length - thatIdx)];
-                    System.arraycopy(that.keys, 0, result, 0, resultSize);
-                }
-            }
-            else
-            {
-                thatIdx++;
-                if (result == that.keys)
-                    continue;
-                minKey = thatKey;
-                if (result == noOp)
-                {
-                    resultSize = thisIdx;
-                    result = new Key[resultSize + (keys.length - thisIdx) + (that.keys.length - (thatIdx - 1))];
-                    System.arraycopy(this.keys, 0, result, 0, resultSize);
-                }
-            }
-            result[resultSize++] = minKey;
-        }
-
-        if (result == noOp)
-        {
-            if (noOp == keys && thatIdx == that.keys.length)
-                return this;
-            if (noOp == that.keys && thisIdx == keys.length)
-                return that;
-
-            resultSize = noOp == keys ? thisIdx : thatIdx;
-            result = new Key[resultSize + (keys.length - thisIdx) + (that.keys.length - thatIdx)];
-            System.arraycopy(noOp, 0, result, 0, resultSize);
-        }
-
-        while (thisIdx < this.size())
-            result[resultSize++] = this.keys[thisIdx++];
-
-        while (thatIdx < that.size())
-            result[resultSize++] = that.keys[thatIdx++];
-
-        if (resultSize < result.length)
-            result = Arrays.copyOf(result, resultSize);
-
-        return new Keys(result);
-    }
-
-    public Keys intersect(Keys that)
-    {
-        int thisIdx = 0;
-        int thatIdx = 0;
-        Key[] noOp = keys.length <= that.keys.length ? this.keys : that.keys;
-        Key[] result = noOp;
-        int resultSize = 0;
-
-        while (thisIdx < this.size() && thatIdx < that.size())
-        {
-            Key thisKey = this.keys[thisIdx];
-            Key thatKey = that.keys[thatIdx];
-            int cmp = thisKey.compareTo(thatKey);
-            if (cmp == 0)
-            {
-                thisIdx++;
-                thatIdx++;
-                if (result != noOp)
-                    result[resultSize] = thisKey;
-                resultSize++;
-            }
-            else
-            {
-                if (cmp < 0) thisIdx++;
-                else thatIdx++;
-                if (result == noOp)
-                {
-                    result = new Key[resultSize + Math.min(keys.length - thisIdx, that.keys.length - thatIdx)];
-                    System.arraycopy(noOp, 0, result, 0, resultSize);
-                }
-            }
-        }
-
-        if (result == noOp && resultSize == noOp.length)
-            return noOp == keys ? this : that;
-
-        if (resultSize < result.length)
-            result = Arrays.copyOf(result, resultSize);
-
-        return new Keys(result);
-    }
-
-    public boolean isEmpty()
-    {
-        return keys.length == 0;
-    }
-
-    public int size()
-    {
-        return keys.length;
-    }
-
-    public int search(int lowerBound, int upperBound, Object key, Comparator<Object> comparator)
-    {
-        return Arrays.binarySearch(keys, lowerBound, upperBound, key, comparator);
-    }
-
-    public int ceilIndex(int lowerBound, int upperBound, Key key)
-    {
-        int i = Arrays.binarySearch(keys, lowerBound, upperBound, key);
-        if (i < 0) i = -1 - i;
-        return i;
-    }
-
-    public int ceilIndex(Key key)
-    {
-        return ceilIndex(0, keys.length, key);
-    }
-
-    public Keys with(Key key)
-    {
-        int insertPos = Arrays.binarySearch(keys, key);
-        if (insertPos >= 0)
-            return this;
-        insertPos = -1 - insertPos;
-
-        Key[] src = keys;
-        Key[] trg = new Key[src.length + 1];
-        System.arraycopy(src, 0, trg, 0, insertPos);
-        trg[insertPos] = key;
-        System.arraycopy(src, insertPos, trg, insertPos + 1, src.length - insertPos);
-        return new Keys(trg);
-    }
-
-    public Stream<Key> stream()
-    {
-        return Stream.of(keys);
-    }
-
-    @Override
-    public Iterator<Key> iterator()
-    {
-        return new Iterator<>()
-        {
-            int i = 0;
-            @Override
-            public boolean hasNext()
-            {
-                return i < keys.length;
-            }
-
-            @Override
-            public Key next()
-            {
-                return keys[i++];
-            }
-        };
-    }
-
-    @Override
-    public String toString()
-    {
-        return stream().map(Object::toString).collect(Collectors.joining(",", "[", "]"));
-    }
-
-    public static Keys of(Key k0, Key... kn)
-    {
-        Key[] keys = new Key[kn.length + 1];
-        keys[0] = k0;
-        for (int i=0; i<kn.length; i++)
-            keys[i + 1] = kn[i];
-
-        return new Keys(keys);
-    }
-
-    public Keys intersect(KeyRanges ranges)
-    {
-        Key[] result = null;
-        int resultSize = 0;
-
-        int keyLB = 0;
-        int keyHB = size();
-        int rangeLB = 0;
-        int rangeHB = ranges.rangeIndexForKey(keys[keyHB-1]);
-        rangeHB = rangeHB < 0 ? -1 - rangeHB : rangeHB + 1;
-
-        for (;rangeLB<rangeHB && keyLB<keyHB;)
-        {
-            Key key = keys[keyLB];
-            rangeLB = ranges.rangeIndexForKey(rangeLB, ranges.size(), key);
-
-            if (rangeLB < 0)
-            {
-                rangeLB = -1 -rangeLB;
-                if (rangeLB >= rangeHB)
-                    break;
-                keyLB = ranges.get(rangeLB).lowKeyIndex(this, keyLB, keyHB);
-            }
-            else
-            {
-                if (result == null)
-                    result = new Key[keyHB - keyLB];
-                KeyRange<?> range = ranges.get(rangeLB);
-                int highKey = range.higherKeyIndex(this, keyLB, keyHB);
-                int size = highKey - keyLB;
-                System.arraycopy(keys, keyLB, result, resultSize, size);
-                keyLB = highKey;
-                resultSize += size;
-                rangeLB++;
-            }
-
-            if (keyLB < 0)
-                keyLB = -1 - keyLB;
-        }
-
-        if (result != null && resultSize < result.length)
-            result = Arrays.copyOf(result, resultSize);
-
-        return result != null ? new Keys(result) : EMPTY;
-    }
-
-    public interface KeyFold<V>
-    {
-        V fold(Key key, V value);
-    }
-
-    /**
-     * Count the number of keys matching the predicate and intersecting with the given ranges.
-     * If terminateAfter is greater than 0, the method will return once terminateAfter matches are encountered
-     */
-    public <V> V foldl(KeyRanges ranges, KeyFold<V> fold, V accumulator)
-    {
-        int keyLB = 0;
-        int keyHB = size();
-        int rangeLB = 0;
-        int rangeHB = ranges.rangeIndexForKey(keys[keyHB-1]);
-        rangeHB = rangeHB < 0 ? -1 - rangeHB : rangeHB + 1;
-
-        for (;rangeLB<rangeHB && keyLB<keyHB;)
-        {
-            Key key = keys[keyLB];
-            rangeLB = ranges.rangeIndexForKey(rangeLB, rangeHB, key);
-
-            if (rangeLB < 0)
-            {
-                rangeLB = -1 -rangeLB;
-                if (rangeLB >= rangeHB)
-                    break;
-                keyLB = ranges.get(rangeLB).lowKeyIndex(this, keyLB, keyHB);
-            }
-            else
-            {
-                KeyRange<?> range = ranges.get(rangeLB);
-                int highKey = range.higherKeyIndex(this, keyLB, keyHB);
-
-                for (int i=keyLB; i<highKey; i++)
-                    accumulator = fold.fold(keys[i], accumulator);
-
-                keyLB = highKey;
-                rangeLB++;
-            }
-
-            if (keyLB < 0)
-                keyLB = -1 - keyLB;
-        }
-
-        return accumulator;
-    }
-
-    public boolean any(KeyRanges ranges, Predicate<Key> predicate)
-    {
-        return 1 == foldl(ranges, (key, i1, i2) -> predicate.test(key) ? 1 : 0, 0, 0, 1);
-    }
-
-    public interface FoldKeysToLong
-    {
-        long apply(Key key, long param, long prev);
-    }
-
-    public long foldl(KeyRanges ranges, FoldKeysToLong fold, long param, long initialValue, long terminalValue)
-    {
-        int keyLB = 0;
-        int keyHB = size();
-        int rangeLB = 0;
-        int rangeHB = ranges.rangeIndexForKey(keys[keyHB-1]);
-        rangeHB = rangeHB < 0 ? -1 - rangeHB : rangeHB + 1;
-
-        for (;rangeLB<rangeHB && keyLB<keyHB;)
-        {
-            Key key = keys[keyLB];
-            rangeLB = ranges.rangeIndexForKey(rangeLB, ranges.size(), key);
-
-            if (rangeLB < 0)
-            {
-                rangeLB = -1 -rangeLB;
-                if (rangeLB >= rangeHB)
-                    break;
-                keyLB = ranges.get(rangeLB).lowKeyIndex(this, keyLB, keyHB);
-            }
-            else
-            {
-                KeyRange<?> range = ranges.get(rangeLB);
-                int highKey = range.higherKeyIndex(this, keyLB, keyHB);
-
-                for (int i=keyLB; i<highKey; i++)
-                {
-                    initialValue = fold.apply(keys[i], param, initialValue);
-                    if (terminalValue == initialValue)
-                        return initialValue;
-                }
-
-                keyLB = highKey;
-                rangeLB++;
-            }
-
-            if (keyLB < 0)
-                keyLB = -1 - keyLB;
-        }
-
-        return initialValue;
-    }
-}
index 4e6fec6600f7e0d84f696a4e7a0cf885e4e81ba3..ed138f7ec80d2a0b1a53eea0432c3d2d1213c5b9 100644 (file)
@@ -4,10 +4,12 @@ import java.util.Objects;
 
 import accord.api.*;
 import accord.local.*;
+import accord.primitives.Keys;
+import accord.primitives.Timestamp;
 
 public class Txn
 {
-    enum Kind { READ, WRITE, RECONFIGURE }
+    enum Kind { READ, WRITE }
 
     final Kind kind;
     public final Keys keys;
@@ -50,21 +52,12 @@ public class Txn
 
     public boolean isWrite()
     {
-        switch (kind)
-        {
-            default:
-                throw new IllegalStateException();
-            case READ:
-                return false;
-            case WRITE:
-            case RECONFIGURE:
-                return true;
-        }
+        return kind == Kind.WRITE;
     }
 
     public Result result(Data data)
     {
-        return query.compute(data);
+        return query.compute(data, read, update);
     }
 
     public Writes execute(Timestamp executeAt, Data data)
@@ -87,7 +80,7 @@ public class Txn
 
     public Data read(Command command, Keys keys)
     {
-        return keys.foldl(command.commandStore.ranges().at(command.executeAt().epoch), (key, accumulate) -> {
+        return keys.foldl(command.commandStore.ranges().at(command.executeAt().epoch), (index, key, accumulate) -> {
             CommandStore commandStore = command.commandStore;
             if (!commandStore.hashIntersects(key))
                 return accumulate;
index b919ad41e103f97fc92b51b4c4cfa238e93c3f27..4ddc2055db08068e2d61c5b5b837e50ce1ac5d03 100644 (file)
@@ -2,7 +2,9 @@ package accord.txn;
 
 import accord.api.Write;
 import accord.local.CommandStore;
-import accord.topology.KeyRanges;
+import accord.primitives.Keys;
+import accord.primitives.Timestamp;
+import accord.primitives.KeyRanges;
 
 public class Writes
 {
@@ -26,7 +28,7 @@ public class Writes
         if (ranges == null)
             return;
 
-        keys.foldl(ranges, (key, accumulate) -> {
+        keys.foldl(ranges, (index, key, accumulate) -> {
             if (commandStore.hashIntersects(key))
                 write.apply(key, executeAt, commandStore.store());
             return accumulate;
diff --git a/accord-core/src/main/java/accord/utils/ArrayBuffers.java b/accord-core/src/main/java/accord/utils/ArrayBuffers.java
new file mode 100644 (file)
index 0000000..7279db1
--- /dev/null
@@ -0,0 +1,552 @@
+package accord.utils;
+
+import accord.api.Key;
+import accord.primitives.TxnId;
+import com.google.common.base.Preconditions;
+
+import java.lang.reflect.Array;
+import java.util.Arrays;
+import java.util.function.IntFunction;
+
+/**
+ * A set of utility classes and interfaces for managing a collection of buffers for arrays of certain types.
+ *
+ * These buffers are designed to be used to combine simple one-shot methods that consume and produce one or more arrays
+ * with methods that may (or may not) call them repeatedly. Specifically, {@link accord.primitives.Deps#linearUnion},
+ * {@link SortedArrays#linearUnion} and {@link SortedArrays#linearIntersection}
+ *
+ * To support this efficiently and ergonomically for users of the one-shot methods, the cache management must
+ * support fetching buffers for re-use, but also returning either the buffer that was used (in the case where we
+ * intend to re-invoke this or another method with the buffer as input), or a properly sized final output array
+ * if the result of the method is to be consumed immediately.
+ *
+ * This functionality is implemented in {@link ObjectBuffers#complete(Object[], int)} and {@link IntBuffers#complete(int[], int)}
+ * which may either shrink the output array, or capture the size and return the buffer.
+ *
+ * Since these methods also may return either of their inputs, which may themselves be buffers, we support capturing
+ * the size of the input we have returned via {@link ObjectBuffers#completeWithExisting(Object[], int)}}
+ */
+public class ArrayBuffers
+{
+    private static final boolean FULLY_UNCACHED = true;
+
+    // TODO: we should periodically clear the thread locals to ensure we aren't slowly accumulating unnecessarily large objects on every thread
+    private static final ThreadLocal<IntBufferCache> INTS = ThreadLocal.withInitial(() -> new IntBufferCache(4, 1 << 14));
+    private static final ThreadLocal<ObjectBufferCache<Key>> KEYS = ThreadLocal.withInitial(() -> new ObjectBufferCache<>(3, 1 << 9, Key[]::new));
+    private static final ThreadLocal<ObjectBufferCache<TxnId>> TXN_IDS = ThreadLocal.withInitial(() -> new ObjectBufferCache<>(3, 1 << 12, TxnId[]::new));
+
+    public static IntBuffers cachedInts()
+    {
+        return INTS.get();
+    }
+
+    public static ObjectBuffers<Key> cachedKeys()
+    {
+        return KEYS.get();
+    }
+
+    public static ObjectBuffers<TxnId> cachedTxnIds()
+    {
+        return TXN_IDS.get();
+    }
+
+    public static <T> ObjectBuffers<T> uncached(IntFunction<T[]> allocator) { return new UncachedObjectBuffers<>(allocator); }
+
+    public static IntBuffers uncachedInts() { return UncachedIntBuffers.INSTANCE; }
+
+    public interface IntBufferAllocator
+    {
+        /**
+         * Return an {@code int[]} of size at least {@code minSize}, possibly from a pool
+         */
+        int[] getInts(int minSize);
+    }
+
+    public interface IntBuffers extends IntBufferAllocator
+    {
+        /**
+         * To be invoked on the result buffer with the number of elements contained;
+         * either the buffer will be returned and the size optionally captured, or else the result may be
+         * shrunk to the size of the contents, depending on implementation.
+         */
+        int[] complete(int[] buffer, int size);
+
+        /**
+         * The buffer is no longer needed by the caller, which is discarding the array;
+         * if {@link #complete(int[], int)} returned the buffer as its result this buffer should NOT be
+         * returned to any pool.
+         *
+         * Note that this method assumes {@link #complete(int[], int)} was invoked on this buffer previously.
+         * However, it is guaranteed that a failure to do so does not leak memory or pool space, only produces some
+         * additional garbage.
+         *
+         * @return true if the buffer is discarded (and discard-able), false if it was retained or is believed to be in use
+         */
+        boolean discard(int[] buffer, int size);
+
+        /**
+         * Indicate this buffer is definitely unused, and return it to a pool if possible
+         * @return true if the buffer is discarded (and discard-able), false if it was retained
+         */
+        boolean forceDiscard(int[] buffer);
+    }
+
+    public interface ObjectBuffers<T>
+    {
+        /**
+         * Return an {@code T[]} of size at least {@code minSize}, possibly from a pool
+         */
+        T[] get(int minSize);
+
+        /**
+         * To be invoked on the result buffer with the number of elements contained;
+         * either the buffer will be returned and the size optionally captured, or else the result may be
+         * shrunk to the size of the contents, depending on implementation.
+         */
+        T[] complete(T[] buffer, int size);
+
+        /**
+         * To be invoked on an input buffer that constitutes the result, with the number of elements it contained;
+         * either the buffer will be returned and the size optionally captured, or else the result may be
+         * shrunk to the size of the contents, depending on implementation.
+         */
+        T[] completeWithExisting(T[] buffer, int size);
+
+        /**
+         * The buffer is no longer needed by the caller, which is discarding the array;
+         * if {@link #complete(Object[], int)} returned the buffer as its result, this buffer should NOT be
+         * returned to any pool.
+         *
+         * Note that this method assumes {@link #complete(Object[], int)} was invoked on this buffer previously.
+         * However, it is guaranteed that a failure to do so does not leak memory or pool space, only produces some
+         * additional garbage.
+         *
+         * Note also that {@code size} should represent the size of the used space in the array, even if it was later
+         * truncated.
+         *
+         * @return true if the buffer is discarded (and discard-able), false if it was retained or is believed to be in use
+         */
+        boolean discard(T[] buffer, int size);
+
+        /**
+         * Indicate this buffer is definitely unused, and return it to a pool if possible
+         *
+         * Note that {@code size} should represent the size of the used space in the array, even if it was later truncated.
+         *
+         * @return true if the buffer is discarded (and discard-able), false if it was retained
+         */
+        boolean forceDiscard(T[] buffer, int size);
+
+        /**
+         * Returns the {@code size} parameter provided to the most recent {@link #complete(Object[], int)} or {@link #completeWithExisting(Object[], int)}
+         *
+         * Depending on implementation, this is either saved from the last such invocation, or else simply returns the size of the buffer parameter.
+         */
+        int lengthOfLast(T[] buffer);
+    }
+
+    private static final class UncachedIntBuffers implements IntBuffers
+    {
+        static final UncachedIntBuffers INSTANCE = new UncachedIntBuffers();
+        private UncachedIntBuffers()
+        {
+        }
+
+        @Override
+        public int[] getInts(int minSize)
+        {
+            return new int[minSize];
+        }
+
+        @Override
+        public int[] complete(int[] buffer, int size)
+        {
+            if (size == buffer.length)
+                return buffer;
+
+            return Arrays.copyOf(buffer, size);
+        }
+
+        @Override
+        public boolean discard(int[] buffer, int size)
+        {
+            return forceDiscard(buffer);
+        }
+
+        @Override
+        public boolean forceDiscard(int[] buffer)
+        {
+            // if FULLY_UNCACHED we want our caller to also not cache us, so we indicate the buffer has been retained
+            return !FULLY_UNCACHED;
+        }
+    }
+
+    private static final class UncachedObjectBuffers<T> implements ObjectBuffers<T>
+    {
+        final IntFunction<T[]> allocator;
+        private UncachedObjectBuffers(IntFunction<T[]> allocator)
+        {
+            this.allocator = allocator;
+        }
+
+        @Override
+        public T[] get(int minSize)
+        {
+            return allocator.apply(minSize);
+        }
+
+        @Override
+        public T[] complete(T[] buffer, int size)
+        {
+            if (size == buffer.length)
+                return buffer;
+
+            return Arrays.copyOf(buffer, size);
+        }
+
+        @Override
+        public T[] completeWithExisting(T[] buffer, int size)
+        {
+            Preconditions.checkArgument(buffer.length == size);
+            return buffer;
+        }
+
+        @Override
+        public int lengthOfLast(T[] buffer)
+        {
+            return buffer.length;
+        }
+
+        @Override
+        public boolean discard(T[] buffer, int size)
+        {
+            return forceDiscard(buffer, size);
+        }
+
+        @Override
+        public boolean forceDiscard(T[] buffer, int size)
+        {
+            // if FULLY_UNCACHED we want our caller to also not cache us, so we indicate the buffer has been retained
+            return !FULLY_UNCACHED;
+        }
+    }
+
+    /**
+     * A very simple cache that simply stores the largest {@code maxCount} arrays smaller than {@code maxSize}.
+     * Works on both primitive and Object arrays.
+     */
+    private static abstract class AbstractBufferCache<B>
+    {
+        interface Clear<B>
+        {
+            void clear(B array, int usedSize);
+        }
+
+        final IntFunction<B> allocator;
+        final Clear<B> clear;
+        final B empty;
+        final B[] cached;
+        final int maxSize;
+
+        AbstractBufferCache(IntFunction<B> allocator, Clear<B> clear, int maxCount, int maxSize)
+        {
+            this.allocator = allocator;
+            this.maxSize = maxSize;
+            this.cached = (B[])new Object[maxCount];
+            this.empty = allocator.apply(0);
+            this.clear = clear;
+        }
+
+        B getInternal(int minSize)
+        {
+            if (minSize == 0)
+                return empty;
+
+            if (minSize > maxSize)
+                return allocator.apply(minSize);
+
+            for (int i = 0 ; i < cached.length ; ++i)
+            {
+                if (cached[i] != null && Array.getLength(cached[i]) >= minSize)
+                {
+                    B result = cached[i];
+                    cached[i] = null;
+                    return result;
+                }
+            }
+
+            return allocator.apply(minSize);
+        }
+
+        boolean discardInternal(B buffer, int bufferSize, int usedSize, boolean force)
+        {
+            if (bufferSize > maxSize)
+                return true;
+
+            if (bufferSize == usedSize && !force)
+                return false;
+
+            for (int i = 0 ; i < cached.length ; ++i)
+            {
+                if (cached[i] == null || Array.getLength(cached[i]) < bufferSize)
+                {
+                    clear.clear(buffer, usedSize);
+                    cached[i] = buffer;
+                    return false;
+                }
+            }
+
+            return true;
+        }
+    }
+
+    public static class IntBufferCache extends AbstractBufferCache<int[]> implements IntBuffers
+    {
+        IntBufferCache(int maxCount, int maxSize)
+        {
+            super(int[]::new, (i1, i2) -> {}, maxCount, maxSize);
+        }
+
+        @Override
+        public int[] complete(int[] buffer, int size)
+        {
+            if (size == buffer.length)
+                return buffer;
+
+            return Arrays.copyOf(buffer, size);
+        }
+
+        @Override
+        public boolean discard(int[] buffer, int size)
+        {
+            return discardInternal(buffer, buffer.length, size, false);
+        }
+
+        @Override
+        public boolean forceDiscard(int[] buffer)
+        {
+            return discardInternal(buffer, buffer.length, -1, true);
+        }
+
+        @Override
+        public int[] getInts(int minSize)
+        {
+            return getInternal(minSize);
+        }
+    }
+
+    public static class ObjectBufferCache<T> extends AbstractBufferCache<T[]> implements ObjectBuffers<T>
+    {
+        final IntFunction<T[]> allocator;
+
+        ObjectBufferCache(int maxCount, int maxSize, IntFunction<T[]> allocator)
+        {
+            super(allocator, (array, usedSize) -> Arrays.fill(array, 0, usedSize, null), maxCount, maxSize);
+            this.allocator = allocator;
+        }
+
+        public T[] complete(T[] buffer, int size)
+        {
+            if (size == buffer.length)
+                return buffer;
+
+            return Arrays.copyOf(buffer, size);
+        }
+
+        @Override
+        public T[] completeWithExisting(T[] buffer, int size)
+        {
+            return buffer;
+        }
+
+        public int lengthOfLast(T[] buffer)
+        {
+            return buffer.length;
+        }
+
+        public boolean discard(T[] buffer, int size)
+        {
+            return discardInternal(buffer, buffer.length, size, false);
+        }
+
+        @Override
+        public boolean forceDiscard(T[] buffer, int size)
+        {
+            return discardInternal(buffer, buffer.length, size, true);
+        }
+
+        @Override
+        public T[] get(int minSize)
+        {
+            return getInternal(minSize);
+        }
+    }
+
+    /**
+     * Returns the buffer to the caller, saving the length if necessary
+     */
+    public static class PassThroughObjectBuffers<T> implements ObjectBuffers<T>
+    {
+        final ObjectBuffers<T> objs;
+        T[] savedObjs; // permit saving of precisely one unused buffer of any size to assist LinearMerge
+        int length = -1;
+
+        public PassThroughObjectBuffers(ObjectBuffers<T> objs)
+        {
+            this.objs = objs;
+        }
+
+        @Override
+        public T[] get(int minSize)
+        {
+            length = -1;
+            if (savedObjs != null && savedObjs.length >= minSize)
+            {
+                T[] result = savedObjs;
+                savedObjs = null;
+                return result;
+            }
+            return objs.get(minSize);
+        }
+
+        @Override
+        public T[] complete(T[] buffer, int size)
+        {
+            length = size;
+            return buffer;
+        }
+
+        @Override
+        public T[] completeWithExisting(T[] buffer, int size)
+        {
+            length = size;
+            return buffer;
+        }
+
+        /**
+         * Invoke {@link #complete(Object[], int)} on the wrapped ObjectBuffers
+         */
+        public T[] realComplete(T[] buffer, int size)
+        {
+            return objs.complete(buffer, size);
+        }
+
+        @Override
+        public boolean discard(T[] buffer, int size)
+        {
+            return true;
+        }
+
+        @Override
+        public boolean forceDiscard(T[] buffer, int size)
+        {
+            length = -1;
+            if (!objs.forceDiscard(buffer, size))
+                return false;
+
+            return discardInternal(buffer);
+        }
+
+        /**
+         * Invoke {@link #discard(Object[], int)} on the wrapped ObjectBuffers
+         */
+        public void realDiscard(T[] buffer, int size)
+        {
+            length = -1;
+            if (!objs.discard(buffer, size))
+                return;
+
+            discardInternal(buffer);
+        }
+
+        private boolean discardInternal(T[] buffer)
+        {
+            if (savedObjs != null && savedObjs.length >= buffer.length)
+                return false;
+
+            savedObjs = buffer;
+            return true;
+        }
+
+        public int lengthOfLast(T[] buffer)
+        {
+            if (length == -1)
+                throw new IllegalStateException("Attempted to get last length but no call to complete called");
+            return length;
+        }
+    }
+
+    /**
+     * Returns the buffer to the caller, saving the length if necessary
+     */
+    public static class PassThroughObjectAndIntBuffers<T> extends PassThroughObjectBuffers<T> implements IntBuffers
+    {
+        final IntBuffers ints;
+        int[] savedInts;
+
+        public PassThroughObjectAndIntBuffers(ObjectBuffers<T> objs, IntBuffers ints)
+        {
+            super(objs);
+            this.ints = ints;
+        }
+
+        @Override
+        public int[] getInts(int minSize)
+        {
+            if (savedInts != null && savedInts.length >= minSize)
+            {
+                int[] result = savedInts;
+                savedInts = null;
+                return result;
+            }
+
+            return ints.getInts(minSize);
+        }
+
+        @Override
+        public int[] complete(int[] buffer, int size)
+        {
+            return buffer;
+        }
+
+        @Override
+        public boolean discard(int[] buffer, int size)
+        {
+            return false;
+        }
+
+        @Override
+        public boolean forceDiscard(int[] buffer)
+        {
+            if (!ints.forceDiscard(buffer))
+                return false;
+
+            if (savedInts != null && savedInts.length >= buffer.length)
+                return true;
+
+            savedInts = buffer;
+            return false;
+        }
+
+        public int[] realComplete(int[] buffer, int size)
+        {
+            return ints.complete(buffer, size);
+        }
+
+        /**
+         * Pass-through the discard
+         */
+        public void realDiscard(int[] buffer, int size)
+        {
+            if (!ints.discard(buffer, size))
+                return;
+
+            if (savedInts != null && savedInts.length >= buffer.length)
+                return;
+
+            savedInts = buffer;
+            return;
+        }
+    }
+
+}
diff --git a/accord-core/src/main/java/accord/utils/AsymmetricComparator.java b/accord-core/src/main/java/accord/utils/AsymmetricComparator.java
new file mode 100644 (file)
index 0000000..cabbb98
--- /dev/null
@@ -0,0 +1,7 @@
+package accord.utils;
+
+// similar to Cassandra's AsymmetricOrdering, only we can create a static lambda function of this
+public interface AsymmetricComparator<T1, T2>
+{
+    int compare(T1 t1, T2 t2);
+}
diff --git a/accord-core/src/main/java/accord/utils/IndexedFold.java b/accord-core/src/main/java/accord/utils/IndexedFold.java
new file mode 100644 (file)
index 0000000..b53a678
--- /dev/null
@@ -0,0 +1,6 @@
+package accord.utils;
+
+public interface IndexedFold<K, V>
+{
+    V apply(int index, K key, V value);
+}
\ No newline at end of file
diff --git a/accord-core/src/main/java/accord/utils/IndexedFoldIntersectToLong.java b/accord-core/src/main/java/accord/utils/IndexedFoldIntersectToLong.java
new file mode 100644 (file)
index 0000000..9853f46
--- /dev/null
@@ -0,0 +1,6 @@
+package accord.utils;
+
+public interface IndexedFoldIntersectToLong<K>
+{
+    long apply(int leftIndex, int rightIndex, K key, long param, long prev);
+}
diff --git a/accord-core/src/main/java/accord/utils/IndexedFoldToLong.java b/accord-core/src/main/java/accord/utils/IndexedFoldToLong.java
new file mode 100644 (file)
index 0000000..8fc56bd
--- /dev/null
@@ -0,0 +1,6 @@
+package accord.utils;
+
+public interface IndexedFoldToLong<K>
+{
+    long apply(int index, K key, long param, long prev);
+}
diff --git a/accord-core/src/main/java/accord/utils/IndexedRangeFoldToLong.java b/accord-core/src/main/java/accord/utils/IndexedRangeFoldToLong.java
new file mode 100644 (file)
index 0000000..c3a198e
--- /dev/null
@@ -0,0 +1,6 @@
+package accord.utils;
+
+public interface IndexedRangeFoldToLong
+{
+    long apply(int from, int to, long param, long prev);
+}
\ No newline at end of file
diff --git a/accord-core/src/main/java/accord/utils/IntrusiveLinkedList.java b/accord-core/src/main/java/accord/utils/IntrusiveLinkedList.java
deleted file mode 100644 (file)
index 43579d3..0000000
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package accord.utils;
-
-import java.util.Iterator;
-import java.util.Spliterator;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
-
-import static java.util.Spliterators.spliteratorUnknownSize;
-
-/**
- * TODO: copied from Cassandra; extract shared utilities library
- *
- * A simple intrusive double-linked list for maintaining a list of tasks,
- * useful for invalidating queued ordered tasks
- */
-@SuppressWarnings("unchecked")
-public class IntrusiveLinkedList<O extends IntrusiveLinkedListNode> extends IntrusiveLinkedListNode
-{
-    public IntrusiveLinkedList()
-    {
-        prev = next = this;
-    }
-
-    public void add(O add)
-    {
-        assert add.prev == null && add.next == null;
-        IntrusiveLinkedListNode after = this;
-        IntrusiveLinkedListNode before = prev;
-        add.next = after;
-        add.prev = before;
-        before.next = add;
-        after.prev = add;
-    }
-
-    public O poll()
-    {
-        if (isEmpty())
-            return null;
-
-        IntrusiveLinkedListNode next = this.next;
-        next.remove();
-        return (O) next;
-    }
-
-    public boolean isEmpty()
-    {
-        return next == this;
-    }
-
-    public Stream<O> stream()
-    {
-        Iterator<O> iterator = new Iterator<O>()
-        {
-            IntrusiveLinkedListNode next = IntrusiveLinkedList.this.next;
-
-            @Override
-            public boolean hasNext()
-            {
-                return next != IntrusiveLinkedList.this;
-            }
-
-            @Override
-            public O next()
-            {
-                O result = (O)next;
-                next = next.next;
-                return result;
-            }
-        };
-
-        return StreamSupport.stream(spliteratorUnknownSize(iterator, Spliterator.IMMUTABLE), false);
-    }
-}
-
diff --git a/accord-core/src/main/java/accord/utils/IntrusiveLinkedListNode.java b/accord-core/src/main/java/accord/utils/IntrusiveLinkedListNode.java
deleted file mode 100644 (file)
index a021a5c..0000000
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package accord.utils;
-
-/**
- * TODO: copied from Cassandra; extract shared utilities library
- */
-public abstract class IntrusiveLinkedListNode
-{
-    IntrusiveLinkedListNode prev;
-    IntrusiveLinkedListNode next;
-
-    protected boolean isFree()
-    {
-        return next == null;
-    }
-
-    protected void remove()
-    {
-        if (next != null)
-        {
-            prev.next = next;
-            next.prev = prev;
-            next = null;
-            prev = null;
-        }
-    }
-}
diff --git a/accord-core/src/main/java/accord/utils/MergeIterator.java b/accord-core/src/main/java/accord/utils/MergeIterator.java
new file mode 100644 (file)
index 0000000..e803657
--- /dev/null
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package accord.utils;
+
+import com.google.common.collect.AbstractIterator;
+
+import java.util.*;
+
+/** Merges sorted input iterators which individually contain unique items. */
+public abstract class MergeIterator<In,Out> extends AbstractIterator<Out>
+{
+    protected final Reducer<In,Out> reducer;
+    protected final List<? extends Iterator<In>> iterators;
+
+    protected MergeIterator(List<? extends Iterator<In>> iters, Reducer<In, Out> reducer)
+    {
+        this.iterators = iters;
+        this.reducer = reducer;
+    }
+
+    @SuppressWarnings("resource")
+    public static <In, Out> MergeIterator<In, Out> get(List<? extends Iterator<In>> sources,
+                                                       Comparator<? super In> comparator,
+                                                       Reducer<In, Out> reducer)
+    {
+        if (sources.size() == 1)
+        {
+            return reducer.trivialReduceIsTrivial()
+                 ? new TrivialOneToOne<>(sources, reducer)
+                 : new OneToOne<>(sources, reducer);
+        }
+        return new ManyToOne<>(sources, comparator, reducer);
+    }
+
+    public Iterable<? extends Iterator<In>> iterators()
+    {
+        return iterators;
+    }
+
+    public void close()
+    {
+        for (int i=0, isize=iterators.size(); i<isize; i++)
+        {
+            Iterator<In> iterator = iterators.get(i);
+            try
+            {
+                if (iterator instanceof AutoCloseable)
+                    ((AutoCloseable)iterator).close();
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        reducer.close();
+    }
+
+    /**
+     * A MergeIterator that consumes multiple input values per output value.
+     *
+     * The most straightforward way to implement this is to use a {@code PriorityQueue} of iterators, {@code poll} it to
+     * find the next item to consume, then {@code add} the iterator back after advancing. This is not very efficient as
+     * {@code poll} and {@code add} in all cases require at least {@code log(size)} comparisons (usually more than
+     * {@code 2*log(size)}) per consumed item, even if the input is suitable for fast iteration.
+     *
+     * The implementation below makes use of the fact that replacing the top element in a binary heap can be done much
+     * more efficiently than separately removing it and placing it back, especially in the cases where the top iterator
+     * is to be used again very soon (e.g. when there are large sections of the output where only a limited number of
+     * input iterators overlap, which is normally the case in many practically useful situations, e.g. levelled
+     * compaction). To further improve this particular scenario, we also use a short sorted section at the start of the
+     * queue.
+     *
+     * The heap is laid out as this (for {@code SORTED_SECTION_SIZE == 2}):
+     *                 0
+     *                 |
+     *                 1
+     *                 |
+     *                 2
+     *               /   \
+     *              3     4
+     *             / \   / \
+     *             5 6   7 8
+     *            .. .. .. ..
+     * Where each line is a <= relationship.
+     *
+     * In the sorted section we can advance with a single comparison per level, while advancing a level within the heap
+     * requires two (so that we can find the lighter element to pop up).
+     * The sorted section adds a constant overhead when data is uniformly distributed among the iterators, but may up
+     * to halve the iteration time when one iterator is dominant over sections of the merged data (as is the case with
+     * non-overlapping iterators).
+     *
+     * The iterator is further complicated by the need to avoid advancing the input iterators until an output is
+     * actually requested. To achieve this {@code consume} walks the heap to find equal items without advancing the
+     * iterators, and {@code advance} moves them and restores the heap structure before any items can be consumed.
+     * 
+     * To avoid having to do additional comparisons in consume to identify the equal items, we keep track of equality
+     * between children and their parents in the heap. More precisely, the lines in the diagram above define the
+     * following relationship:
+     *   parent <= child && (parent == child) == child.equalParent
+     * We can track, make use of and update the equalParent field without any additional comparisons.
+     *
+     * For more formal definitions and proof of correctness, see CASSANDRA-8915.
+     */
+    static final class ManyToOne<In,Out> extends MergeIterator<In,Out>
+    {
+        protected final Candidate<In>[] heap;
+
+        /** Number of non-exhausted iterators. */
+        int size;
+
+        /**
+         * Position of the deepest, right-most child that needs advancing before we can start consuming.
+         * Because advancing changes the values of the items of each iterator, the parent-chain from any position
+         * in this range that needs advancing is not in correct order. The trees rooted at any position that does
+         * not need advancing, however, retain their prior-held binary heap property.
+         */
+        int needingAdvance;
+
+        /**
+         * The number of elements to keep in order before the binary heap starts, exclusive of the top heap element.
+         */
+        static final int SORTED_SECTION_SIZE = 4;
+
+        public ManyToOne(List<? extends Iterator<In>> iters, Comparator<? super In> comp, Reducer<In, Out> reducer)
+        {
+            super(iters, reducer);
+
+            @SuppressWarnings("unchecked")
+            Candidate<In>[] heap = new Candidate[iters.size()];
+            this.heap = heap;
+            size = 0;
+
+            for (int i = 0; i < iters.size(); i++)
+            {
+                Candidate<In> candidate = new Candidate<>(i, iters.get(i), comp);
+                heap[size++] = candidate;
+            }
+            needingAdvance = size;
+        }
+
+        protected final Out computeNext()
+        {
+            advance();
+            return consume();
+        }
+
+        /**
+         * Advance all iterators that need to be advanced and place them into suitable positions in the heap.
+         *
+         * By walking the iterators backwards we know that everything after the point being processed already forms
+         * correctly ordered subheaps, thus we can build a subheap rooted at the current position by only sinking down
+         * the newly advanced iterator. Because all parents of a consumed iterator are also consumed there is no way
+         * that we can process one consumed iterator but skip over its parent.
+         *
+         * The procedure is the same as the one used for the initial building of a heap in the heapsort algorithm and
+         * has a maximum number of comparisons {@code (2 * log(size) + SORTED_SECTION_SIZE / 2)} multiplied by the
+         * number of iterators whose items were consumed at the previous step, but is also at most linear in the size of
+         * the heap if the number of consumed elements is high (as it is in the initial heap construction). With non- or
+         * lightly-overlapping iterators the procedure finishes after just one (resp. a couple of) comparisons.
+         */
+        private void advance()
+        {
+            // Turn the set of candidates into a heap.
+            for (int i = needingAdvance - 1; i >= 0; --i)
+            {
+                Candidate<In> candidate = heap[i];
+                /**
+                 *  needingAdvance runs to the maximum index (and deepest-right node) that may need advancing;
+                 *  since the equal items that were consumed at-once may occur in sub-heap "veins" of equality,
+                 *  not all items above this deepest-right position may have been consumed; these already form
+                 *  valid sub-heaps and can be skipped-over entirely
+                 */
+                if (candidate.needsAdvance())
+                    replaceAndSink(candidate.advance(), i);
+            }
+        }
+
+        /**
+         * Consume all items that sort like the current top of the heap. As we cannot advance the iterators to let
+         * equivalent items pop up, we walk the heap to find them and mark them as needing advance.
+         *
+         * This relies on the equalParent flag to avoid doing any comparisons.
+         */
+        private Out consume()
+        {
+            if (size == 0)
+                return endOfData();
+
+            reducer.onKeyChange();
+            assert !heap[0].equalParent;
+            heap[0].consume(reducer);
+            final int size = this.size;
+            final int sortedSectionSize = Math.min(size, SORTED_SECTION_SIZE);
+            int i;
+            consume: {
+                for (i = 1; i < sortedSectionSize; ++i)
+                {
+                    if (!heap[i].equalParent)
+                        break consume;
+                    heap[i].consume(reducer);
+                }
+                i = Math.max(i, consumeHeap(i) + 1);
+            }
+            needingAdvance = i;
+            return reducer.getReduced();
+        }
+
+        /**
+         * Recursively consume all items equal to equalItem in the binary subheap rooted at position idx.
+         *
+         * @return the largest equal index found in this search.
+         */
+        private int consumeHeap(int idx)
+        {
+            if (idx >= size || !heap[idx].equalParent)
+                return -1;
+
+            heap[idx].consume(reducer);
+            int nextIdx = (idx << 1) - (SORTED_SECTION_SIZE - 1);
+            return Math.max(idx, Math.max(consumeHeap(nextIdx), consumeHeap(nextIdx + 1)));
+        }
+
+        /**
+         * Replace an iterator in the heap with the given position and move it down the heap until it finds its proper
+         * position, pulling lighter elements up the heap.
+         *
+         * Whenever an equality is found between two elements that form a new parent-child relationship, the child's
+         * equalParent flag is set to true if the elements are equal.
+         */
+        private void replaceAndSink(Candidate<In> candidate, int currIdx)
+        {
+            if (candidate == null)
+            {
+                // Drop iterator by replacing it with the last one in the heap.
+                candidate = heap[--size];
+                heap[size] = null; // not necessary but helpful for debugging
+            }
+            // The new element will be top of its heap, at this point there is no parent to be equal to.
+            candidate.equalParent = false;
+
+            final int size = this.size;
+            final int sortedSectionSize = Math.min(size - 1, SORTED_SECTION_SIZE);
+
+            int nextIdx;
+
+            // Advance within the sorted section, pulling up items lighter than candidate.
+            while ((nextIdx = currIdx + 1) <= sortedSectionSize)
+            {
+                if (!heap[nextIdx].equalParent) // if we were greater then an (or were the) equal parent, we are >= the child
+                {
+                    int cmp = candidate.compareTo(heap[nextIdx]);
+                    if (cmp <= 0)
+                    {
+                        heap[nextIdx].equalParent = cmp == 0;
+                        heap[currIdx] = candidate;
+                        return;
+                    }
+                }
+
+                heap[currIdx] = heap[nextIdx];
+                currIdx = nextIdx;
+            }
+            // If size <= SORTED_SECTION_SIZE, nextIdx below will be no less than size,
+            // because currIdx == sortedSectionSize == size - 1 and nextIdx becomes
+            // (size - 1) * 2) - (size - 1 - 1) == size.
+
+            // Advance in the binary heap, pulling up the lighter element from the two at each level.
+            while ((nextIdx = (currIdx * 2) - (sortedSectionSize - 1)) + 1 < size)
+            {
+                if (!heap[nextIdx].equalParent)
+                {
+                    if (!heap[nextIdx + 1].equalParent)
+                    {
+                        // pick the smallest of the two children
+                        int siblingCmp = heap[nextIdx + 1].compareTo(heap[nextIdx]);
+                        if (siblingCmp < 0)
+                            ++nextIdx;
+
+                        // if we're smaller than this, we are done, and must only restore the heap and equalParent properties
+                        int cmp = candidate.compareTo(heap[nextIdx]);
+                        if (cmp <= 0)
+                        {
+                            if (cmp == 0)
+                            {
+                                heap[nextIdx].equalParent = true;
+                                if (siblingCmp == 0) // siblingCmp == 0 => nextIdx is the left child
+                                    heap[nextIdx + 1].equalParent = true;
+                            }
+
+                            heap[currIdx] = candidate;
+                            return;
+                        }
+
+                        if (siblingCmp == 0)
+                        {
+                            // siblingCmp == 0 => nextIdx is still the left child
+                            // if the two siblings were equal, and we are inserting something greater, we will
+                            // pull up the left one; this means the right gets an equalParent
+                            heap[nextIdx + 1].equalParent = true;
+                        }
+                    }
+                    else
+                        ++nextIdx;  // descend down the path where we found the equal child
+                }
+
+                heap[currIdx] = heap[nextIdx];
+                currIdx = nextIdx;
+            }
+
+            // our loop guard ensures there are always two siblings to process; typically when we exit the loop we will
+            // be well past the end of the heap and this next condition will match...
+            if (nextIdx >= size)
+            {
+                heap[currIdx] = candidate;
+                return;
+            }
+
+            // ... but sometimes we will have one last child to compare against, that has no siblings
+            if (!heap[nextIdx].equalParent)
+            {
+                int cmp = candidate.compareTo(heap[nextIdx]);
+                if (cmp <= 0)
+                {
+                    heap[nextIdx].equalParent = cmp == 0;
+                    heap[currIdx] = candidate;
+                    return;
+                }
+            }
+
+            heap[currIdx] = heap[nextIdx];
+            heap[nextIdx] = candidate;
+        }
+    }
+
+    // Holds and is comparable by the head item of an iterator it owns
+    protected static final class Candidate<In> implements Comparable<Candidate<In>>
+    {
+        private final Iterator<? extends In> iter;
+        private final Comparator<? super In> comp;
+        private final int idx;
+        private In item;
+        boolean equalParent;
+
+        public Candidate(int idx, Iterator<? extends In> iter, Comparator<? super In> comp)
+        {
+            this.iter = iter;
+            this.comp = comp;
+            this.idx = idx;
+        }
+
+        /** @return this if our iterator had an item, and it is now available, otherwise null */
+        protected Candidate<In> advance()
+        {
+            if (!iter.hasNext())
+                return null;
+
+            item = iter.next();
+            return this;
+        }
+
+        public int compareTo(Candidate<In> that)
+        {
+            assert this.item != null && that.item != null;
+            return comp.compare(this.item, that.item);
+        }
+
+        public void consume(Reducer reducer)
+        {
+            reducer.reduce(idx, item);
+            item = null;
+        }
+
+        public boolean needsAdvance()
+        {
+            return item == null;
+        }
+    }
+
+    /** Accumulator that collects values of type A, and outputs a value of type B. */
+    public static abstract class Reducer<In,Out>
+    {
+        /**
+         * @return true if Out is the same as In for the case of a single source iterator
+         */
+        public boolean trivialReduceIsTrivial()
+        {
+            return false;
+        }
+
+        /**
+         * combine this object with the previous ones.
+         * intermediate state is up to your implementation.
+         */
+        public abstract void reduce(int idx, In current);
+
+        /** @return The last object computed by reduce */
+        protected abstract Out getReduced();
+
+        /**
+         * Called at the beginning of each new key, before any reduce is called.
+         * To be overridden by implementing classes.
+         */
+        protected void onKeyChange() {}
+
+        /**
+         * May be overridden by implementations that require cleaning up after use
+         */
+        public void close() {}
+    }
+
+    private static class OneToOne<In, Out> extends MergeIterator<In, Out>
+    {
+        private final Iterator<In> source;
+
+        public OneToOne(List<? extends Iterator<In>> sources, Reducer<In, Out> reducer)
+        {
+            super(sources, reducer);
+            source = sources.get(0);
+        }
+
+        protected Out computeNext()
+        {
+            if (!source.hasNext())
+                return endOfData();
+            reducer.onKeyChange();
+            reducer.reduce(0, source.next());
+            return reducer.getReduced();
+        }
+    }
+
+    private static class TrivialOneToOne<In, Out> extends MergeIterator<In, Out>
+    {
+        private final Iterator<In> source;
+
+        public TrivialOneToOne(List<? extends Iterator<In>> sources, Reducer<In, Out> reducer)
+        {
+            super(sources, reducer);
+            source = sources.get(0);
+        }
+
+        @SuppressWarnings("unchecked")
+        protected Out computeNext()
+        {
+            if (!source.hasNext())
+                return endOfData();
+            return (Out) source.next();
+        }
+    }
+}
diff --git a/accord-core/src/main/java/accord/utils/SortedArrays.java b/accord-core/src/main/java/accord/utils/SortedArrays.java
new file mode 100644 (file)
index 0000000..88bd5d0
--- /dev/null
@@ -0,0 +1,817 @@
+package accord.utils;
+
+import java.util.Arrays;
+import java.util.function.IntFunction;
+
+import accord.utils.ArrayBuffers.ObjectBuffers;
+import accord.utils.ArrayBuffers.IntBufferAllocator;
+import org.apache.cassandra.utils.concurrent.Inline;
+
+import javax.annotation.Nullable;
+
+import static accord.utils.ArrayBuffers.uncached;
+
+public class SortedArrays
+{
+    /**
+     * {@link #linearUnion(Comparable[], int, Comparable[], int, ObjectBuffers)}
+     */
+    public static <T extends Comparable<? super T>> T[] linearUnion(T[] left, T[] right, IntFunction<T[]> allocator)
+    {
+        return linearUnion(left, right, uncached(allocator));
+    }
+
+    /**
+     * {@link #linearUnion(Comparable[], int, Comparable[], int, ObjectBuffers)}
+     */
+    public static <T extends Comparable<? super T>> T[] linearUnion(T[] left, T[] right, ObjectBuffers<T> buffers)
+    {
+        return linearUnion(left, left.length, right, right.length, buffers);
+    }
+
+    /**
+     * Given two sorted buffers where the contents within each array are unique, but may duplicate each other,
+     * return a sorted array containing the result of merging the two input buffers.
+     *
+     * If one of the two input buffers represents a superset of the other, this buffer will be returned unmodified.
+     *
+     * Otherwise, depending on {@code buffers}, a result buffer may itself be returned or a new array.
+     *
+     * TODO: introduce exponential search optimised version
+     * TODO: also compare with Hwang and Lin algorithm
+     * TODO: could also compare with a recursive partitioning scheme like quicksort
+     * (note that dual exponential search is also an optimal algorithm, just seemingly ignored by the literature,
+     * and may be in practice faster for lists that are more often similar in size, and only occasionally very different.
+     * Without performing extensive analysis, exponential search likely has higher constant factors in terms of the
+     * constant multiplier on number of comparisons performed, but lower constant factors for managing the algorithm state
+     * unless we implemented the static Hwang and Lin that does not re-assess the relative sizes of the remaining inputs)
+     *
+     * We could also improve performance with instruction parallelism, by e.g. merging the front and backs of the
+     * two input arrays independently, copying to the front/back of each buffer. Since most results must be array-copied
+     * to be minimised this would only be costlier in situations where we are returning the output buffer for re-use,
+     * and it would not be much costlier.
+     */
+    public static <T extends Comparable<? super T>> T[] linearUnion(T[] left, int leftLength, T[] right, int rightLength, ObjectBuffers<T> buffers)
+    {
+        int leftIdx = 0;
+        int rightIdx = 0;
+
+        T[] result = null;
+        int resultSize = 0;
+
+        // first, pick the superset candidate and merge the two until we find the first missing item
+        // if none found, return the superset candidate
+        if (leftLength >= rightLength)
+        {
+            while (leftIdx < leftLength && rightIdx < rightLength)
+            {
+                T leftKey = left[leftIdx];
+                T rightKey = right[rightIdx];
+                int cmp = leftKey == rightKey ? 0 : leftKey.compareTo(rightKey);
+
+                if (cmp <= 0)
+                {
+                    leftIdx += 1;
+                    rightIdx += cmp == 0 ? 1 : 0;
+                }
+                else
+                {
+                    resultSize = leftIdx;
+                    result = buffers.get(resultSize + (leftLength - leftIdx) + (rightLength - (rightIdx - 1)));
+                    System.arraycopy(left, 0, result, 0, resultSize);
+                    result[resultSize++] = right[rightIdx++];
+                    break;
+                }
+            }
+
+            if (result == null)
+            {
+                if (rightIdx == rightLength) // all elements matched, so can return the other array
+                    return buffers.completeWithExisting(left, leftLength);
+                // no elements matched or only a subset matched
+                result = buffers.get(leftLength + (rightLength - rightIdx));
+                resultSize = leftIdx;
+                System.arraycopy(left, 0, result, 0, resultSize);
+            }
+        }
+        else
+        {
+            while (leftIdx < leftLength && rightIdx < rightLength)
+            {
+                T leftKey = left[leftIdx];
+                T rightKey = right[rightIdx];
+                int cmp = leftKey == rightKey ? 0 : leftKey.compareTo(rightKey);
+
+                if (cmp >= 0)
+                {
+                    rightIdx += 1;
+                    leftIdx += cmp == 0 ? 1 : 0;
+                }
+                else
+                {
+                    resultSize = rightIdx;
+                    result = buffers.get(resultSize + (leftLength - (leftIdx - 1)) + (rightLength - rightIdx));
+                    System.arraycopy(right, 0, result, 0, resultSize);
+                    result[resultSize++] = left[leftIdx++];
+                    break;
+                }
+            }
+
+            if (result == null)
+            {
+                if (leftIdx == leftLength) // all elements matched, so can return the other array
+                    return buffers.completeWithExisting(right, rightLength);
+                // no elements matched or only a subset matched
+                result = buffers.get(rightLength + (leftLength - leftIdx));
+                resultSize = rightIdx;
+                System.arraycopy(right, 0, result, 0, resultSize);
+            }
+        }
+
+        try
+        {
+            while (leftIdx < leftLength && rightIdx < rightLength)
+            {
+                T leftKey = left[leftIdx];
+                T rightKey = right[rightIdx];
+                int cmp = leftKey == rightKey ? 0 : leftKey.compareTo(rightKey);
+
+                T minKey;
+                if (cmp == 0)
+                {
+                    leftIdx++;
+                    rightIdx++;
+                    minKey = leftKey;
+                }
+                else if (cmp < 0)
+                {
+                    leftIdx++;
+                    minKey = leftKey;
+                }
+                else
+                {
+                    rightIdx++;
+                    minKey = rightKey;
+                }
+                result[resultSize++] = minKey;
+            }
+
+            while (leftIdx < leftLength)
+                result[resultSize++] = left[leftIdx++];
+
+            while (rightIdx < rightLength)
+                result[resultSize++] = right[rightIdx++];
+
+            return buffers.complete(result, resultSize);
+        }
+        finally
+        {
+            buffers.discard(result, resultSize);
+        }
+    }
+
+    /**
+     * {@link #linearIntersection(Comparable[], int, Comparable[], int, ObjectBuffers)}
+     */
+    public static <T extends Comparable<? super T>> T[] linearIntersection(T[] left, T[] right, IntFunction<T[]> allocator)
+    {
+        return linearIntersection(left, right, uncached(allocator));
+    }
+
+    /**
+     * {@link #linearIntersection(Comparable[], int, Comparable[], int, ObjectBuffers)}
+     */
+    public static <T extends Comparable<? super T>> T[] linearIntersection(T[] left, T[] right, ObjectBuffers<T> buffers)
+    {
+        return linearIntersection(left, left.length, right, right.length, buffers);
+    }
+
+    /**
+     * Given two sorted buffers where the contents within each array are unique, but may duplicate each other,
+     * return a sorted sorted array containing the elements present in both input buffers.
+     *
+     * If one of the two input buffers represents a superset of the other, this buffer will be returned unmodified.
+     *
+     * Otherwise, depending on {@code buffers}, a result buffer may itself be returned or a new array.
+     *
+     * TODO: introduce exponential search optimised version
+     */
+    public static <T extends Comparable<? super T>> T[] linearIntersection(T[] left, int leftLength, T[] right, int rightLength, ObjectBuffers<T> buffers)
+    {
+        int leftIdx = 0;
+        int rightIdx = 0;
+
+        T[] result = null;
+        int resultSize = 0;
+
+        // first pick a subset candidate, and merge both until we encounter an element not present in the other array
+        if (leftLength <= rightLength)
+        {
+            boolean hasMatch = false;
+            while (leftIdx < leftLength && rightIdx < rightLength)
+            {
+                T leftKey = left[leftIdx];
+                T rightKey = right[rightIdx];
+                int cmp = leftKey == rightKey ? 0 : leftKey.compareTo(rightKey);
+
+                if (cmp >= 0)
+                {
+                    rightIdx += 1;
+                    leftIdx += cmp == 0 ? 1 : 0;
+                    if (cmp == 0)
+                        hasMatch = true;
+                }
+                else
+                {
+                    resultSize = leftIdx++;
+                    result = buffers.get(resultSize + Math.min(leftLength - leftIdx, rightLength - rightIdx));
+                    System.arraycopy(left, 0, result, 0, resultSize);
+                    break;
+                }
+            }
+
+            if (result == null)
+                return hasMatch ? buffers.completeWithExisting(left, leftLength) : buffers.complete(buffers.get(0), 0);
+        }
+        else
+        {
+            boolean hasMatch = false;
+            while (leftIdx < leftLength && rightIdx < rightLength)
+            {
+                T leftKey = left[leftIdx];
+                T rightKey = right[rightIdx];
+                int cmp = leftKey == rightKey ? 0 : leftKey.compareTo(rightKey);
+
+                if (cmp <= 0)
+                {
+                    leftIdx += 1;
+                    rightIdx += cmp == 0 ? 1 : 0;
+                    if (cmp == 0)
+                        hasMatch = true;
+                }
+                else
+                {
+                    resultSize = rightIdx++;
+                    result = buffers.get(resultSize + Math.min(leftLength - leftIdx, rightLength - rightIdx));
+                    System.arraycopy(right, 0, result, 0, resultSize);
+                    break;
+                }
+            }
+
+            if (result == null)
+                return hasMatch ? buffers.completeWithExisting(right, rightLength) : buffers.complete(buffers.get(0), 0);
+        }
+
+        try
+        {
+
+            while (leftIdx < leftLength && rightIdx < rightLength)
+            {
+                T leftKey = left[leftIdx];
+                T rightKey = right[rightIdx];
+                int cmp = leftKey == rightKey ? 0 : leftKey.compareTo(rightKey);
+
+                if (cmp == 0)
+                {
+                    leftIdx++;
+                    rightIdx++;
+                    result[resultSize++] = leftKey;
+                }
+                else if (cmp < 0) leftIdx++;
+                else rightIdx++;
+            }
+
+            return buffers.complete(result, resultSize);
+        }
+        finally
+        {
+            buffers.discard(result, resultSize);
+        }
+    }
+
+    /**
+     * Given two sorted arrays, return the elements present only in the first, preferentially returning the first array
+     * itself if possible
+     */
+    @SuppressWarnings("unused") // was used until recently, might be used again?
+    public static <T extends Comparable<? super T>> T[] linearDifference(T[] left, T[] right, IntFunction<T[]> allocate)
+    {
+        int rightIdx = 0;
+        int leftIdx = 0;
+
+        T[] result = null;
+        int resultSize = 0;
+
+        while (leftIdx < left.length && rightIdx < right.length)
+        {
+            T leftKey = left[leftIdx];
+            T rightKey = right[rightIdx];
+            int cmp = leftKey == rightKey ? 0 : leftKey.compareTo(rightKey);
+
+            if (cmp == 0)
+            {
+                resultSize = leftIdx++;
+                ++rightIdx;
+                result = allocate.apply(resultSize + left.length - leftIdx);
+                System.arraycopy(left, 0, result, 0, resultSize);
+                break;
+            }
+            else if (cmp < 0)
+            {
+                ++leftIdx;
+            }
+            else
+            {
+                ++rightIdx;
+            }
+        }
+
+        if (result == null)
+            return left;
+
+        while (leftIdx < left.length && rightIdx < right.length)
+        {
+            T leftKey = left[leftIdx];
+            T rightKey = right[rightIdx];
+            int cmp = leftKey == rightKey ? 0 : leftKey.compareTo(rightKey);
+
+            if (cmp > 0)
+            {
+                result[resultSize++] = left[leftIdx++];
+            }
+            else if (cmp < 0)
+            {
+                ++rightIdx;
+            }
+            else
+            {
+                ++leftIdx;
+                ++rightIdx;
+            }
+        }
+        while (leftIdx < left.length)
+            result[resultSize++] = left[leftIdx++];
+
+        if (resultSize < result.length)
+            result = Arrays.copyOf(result, resultSize);
+
+        return result;
+    }
+
+    /**
+     * Given two sorted arrays {@code slice} and {@code select}, where each array's contents is unique and non-overlapping
+     * with itself, but may match multiple entries in the other array, return a new array containing the elements of {@code slice}
+     * that match elements of {@code select} as per the provided comparators.
+     */
+    public static <A, R> A[] sliceWithMultipleMatches(A[] slice, R[] select, IntFunction<A[]> factory, AsymmetricComparator<A, R> cmp1, AsymmetricComparator<R, A> cmp2)
+    {
+        A[] result;
+        int resultCount;
+        int ai = 0, ri = 0;
+        while (true)
+        {
+            long ari = findNextIntersection(slice, ai, slice.length, select, ri, select.length, cmp1, cmp2, Search.CEIL);
+            if (ari < 0)
+            {
+                if (ai == slice.length)
+                    return slice; // all elements of slice were found in select, so can return the array unchanged
+
+                // The first (ai - 1) elements are present (without a gap), so copy just that subset
+                return Arrays.copyOf(slice, ai);
+            }
+
+            int nextai = (int)(ari >>> 32);
+            if (ai != nextai)
+            {
+                // A gap is detected in slice!
+                // When ai == nextai we "consume" it and move to the last instance of slice[ai], then choose the next element,
+                // this means that ai currently points to an element in slice where it is not known if its present in select,
+                // so != implies a gap is detected!
+                resultCount = ai;
+                result = factory.apply(ai + (slice.length - nextai));
+                System.arraycopy(slice, 0, result, 0, resultCount);
+                ai = nextai;
+                ri = (int)ari;
+                break;
+            }
+
+            ri = (int)ari;
+            // In cases where duplicates are present in slice, find the last instance of slice[ai], and move past it.
+            // slice[ai] is known to be present, so need to check the next element.
+            ai = exponentialSearch(slice, nextai, slice.length, select[ri], cmp2, Search.FLOOR) + 1;
+        }
+
+        while (true)
+        {
+            // Find the next element after the last element matching select[ri] and copy from slice into result
+            // nextai may be negative (such as -1), so the +1 may keep it negative OR set 0, since 0 < 0 is false
+            // it is safe to avoid checking for negative values
+            int nextai = exponentialSearch(slice, ai, slice.length, select[ri], cmp2, Search.FLOOR) + 1;
+            while (ai < nextai)
+                result[resultCount++] = slice[ai++];
+
+            long ari = findNextIntersection(slice, ai, slice.length, select, ri, select.length, cmp1, cmp2, Search.CEIL);
+            if (ari < 0)
+            {
+                if (resultCount < result.length)
+                    result = Arrays.copyOf(result, resultCount);
+
+                return result;
+            }
+
+            ai = (int)(ari >>> 32);
+            ri = (int)ari;
+        }
+    }
+
+    /**
+     * Copy-on-write insert into the provided array; returns the same array if item already present, or a new array
+     * with the item in the correct position if not. Linear time complexity.
+     */
+    public static <T extends Comparable<? super T>> T[] insert(T[] src, T item, IntFunction<T[]> factory)
+    {
+        int insertPos = Arrays.binarySearch(src, item);
+        if (insertPos >= 0)
+            return src;
+        insertPos = -1 - insertPos;
+
+        T[] trg = factory.apply(src.length + 1);
+        System.arraycopy(src, 0, trg, 0, insertPos);
+        trg[insertPos] = item;
+        System.arraycopy(src, insertPos, trg, insertPos + 1, src.length - insertPos);
+        return trg;
+    }
+
+    /**
+     * Equivalent to {@link Arrays#binarySearch}, only more efficient algorithmically for linear merges.
+     * Binary search has worst case complexity {@code O(n.lg n)} for a linear merge, whereas exponential search
+     * has a worst case of {@code O(n)}. However compared to a simple linear merge, the best case for exponential
+     * search is {@code O(lg(n))} instead of {@code O(n)}.
+     */
+    public static <T1, T2 extends Comparable<? super T1>> int exponentialSearch(T1[] in, int from, int to, T2 find)
+    {
+        return exponentialSearch(in, from, to, find, Comparable::compareTo, Search.FAST);
+    }
+
+    public enum Search
+    {
+        /**
+         * If no matches, return -1 - [the highest index of any element that sorts before]
+         * If multiple matches, return the one with the highest index
+         */
+        FLOOR,
+
+        /**
+         * If no matches, return -1 - [the lowest index of any element that sorts after]
+         * If multiple matches, return the one with the lowest index
+         */
+        CEIL,
+
+        /**
+         * If no matches, return -1 - [the lowest index of any element that sorts after]
+         * If multiple matches, return an arbitrary matching index
+         */
+        FAST
+    }
+
+    /**
+     * Given a sorted array and an item to locate, use exponentialSearch to find a position in the array containing the item,
+     * or if not present an index relative to the item's position were it to be inserted. exponentialSearch offers greater
+     * efficiency than binarySearch when recursing over a list sequentially, finding matches within it.
+     *
+     * If multiple entries match, return either:
+     *  FAST: the first we encounter
+     *  FLOOR: the highest matching array index
+     *  CEIL: the lowest matching array index
+     *
+     * If no entries match, similar to Arrays.binarySearch return either:
+     *  FAST, CEIL: the entry following {@code find}, i.e. -1 - insertPos (== Arrays.binarySearch)
+     *  FLOOR:      the entry preceding {@code find}, i.e. -2 - insertPos
+     */
+    @Inline
+    public static <T1, T2> int exponentialSearch(T2[] in, int from, int to, T1 find, AsymmetricComparator<T1, T2> comparator, Search op)
+    {
+        int step = 0;
+        loop: while (from + step < to)
+        {
+            int i = from + step;
+            int c = comparator.compare(find, in[i]);
+            if (c < 0)
+            {
+                to = i;
+         &nbs