IGNITE-17739 Add Partition Awareness to all table APIs in Java client (#1119) main
authorPavel Tupitsyn <ptupitsyn@apache.org>
Mon, 26 Sep 2022 09:29:12 +0000 (12:29 +0300)
committerGitHub <noreply@github.com>
Mon, 26 Sep 2022 09:29:12 +0000 (12:29 +0300)
* Move common hash calculation logic to `ClientTupleSerializer`.
* Add partition awareness to all APIs in `ClientRecordView`, `ClientRecordBinaryView`, `ClientKeyValueView`, `ClientKeyValueBinaryView`.

modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueBinaryView.java
modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java
modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java
modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTupleSerializer.java
modules/client/src/test/java/org/apache/ignite/client/PartitionAwarenessTest.java
modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java

index a589762a393cf4355d5dd997c8334ef91630f382..edf5be97ca5d42e5be12908499eb1197d556282a 100644 (file)
@@ -75,7 +75,9 @@ public class ClientKeyValueBinaryView implements KeyValueView<Tuple, Tuple> {
         return tbl.doSchemaOutInOpAsync(
                 ClientOp.TUPLE_GET,
                 (s, w) -> ser.writeTuple(tx, key, s, w, true),
-                ClientTupleSerializer::readValueTuple);
+                ClientTupleSerializer::readValueTuple,
+                null,
+                ClientTupleSerializer.getHashFunction(tx, key));
     }
 
     /** {@inheritDoc} */
@@ -89,11 +91,16 @@ public class ClientKeyValueBinaryView implements KeyValueView<Tuple, Tuple> {
     public @NotNull CompletableFuture<Map<Tuple, Tuple>> getAllAsync(@Nullable Transaction tx, @NotNull Collection<Tuple> keys) {
         Objects.requireNonNull(keys);
 
+        if (keys.isEmpty()) {
+            return CompletableFuture.completedFuture(Collections.emptyMap());
+        }
+
         return tbl.doSchemaOutInOpAsync(
                 ClientOp.TUPLE_GET_ALL,
                 (s, w) -> ser.writeTuples(tx, keys, s, w, true),
                 ClientTupleSerializer::readKvTuplesNullable,
-                Collections.emptyMap());
+                Collections.emptyMap(),
+                ClientTupleSerializer.getHashFunction(tx, keys.iterator().next()));
     }
 
     /**
@@ -130,7 +137,9 @@ public class ClientKeyValueBinaryView implements KeyValueView<Tuple, Tuple> {
         return tbl.doSchemaOutInOpAsync(
                 ClientOp.TUPLE_GET,
                 (s, w) -> ser.writeTuple(tx, key, s, w, true),
-                (s, r) -> IgniteUtils.nonNullOrElse(ClientTupleSerializer.readValueTuple(s, r), defaultValue));
+                (s, r) -> IgniteUtils.nonNullOrElse(ClientTupleSerializer.readValueTuple(s, r), defaultValue),
+                null,
+                ClientTupleSerializer.getHashFunction(tx, key));
     }
 
     /** {@inheritDoc} */
@@ -147,7 +156,8 @@ public class ClientKeyValueBinaryView implements KeyValueView<Tuple, Tuple> {
         return tbl.doSchemaOutOpAsync(
                 ClientOp.TUPLE_CONTAINS_KEY,
                 (s, w) -> ser.writeTuple(tx, key, s, w, true),
-                ClientMessageUnpacker::unpackBoolean);
+                ClientMessageUnpacker::unpackBoolean,
+                ClientTupleSerializer.getHashFunction(tx, key));
     }
 
     /** {@inheritDoc} */
@@ -164,7 +174,8 @@ public class ClientKeyValueBinaryView implements KeyValueView<Tuple, Tuple> {
         return tbl.doSchemaOutOpAsync(
                 ClientOp.TUPLE_UPSERT,
                 (s, w) -> ser.writeKvTuple(tx, key, val, s, w, false),
-                r -> null);
+                r -> null,
+                ClientTupleSerializer.getHashFunction(tx, key));
     }
 
     /** {@inheritDoc} */
@@ -178,15 +189,20 @@ public class ClientKeyValueBinaryView implements KeyValueView<Tuple, Tuple> {
     public @NotNull CompletableFuture<Void> putAllAsync(@Nullable Transaction tx, @NotNull Map<Tuple, Tuple> pairs) {
         Objects.requireNonNull(pairs);
 
+        if (pairs.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
         return tbl.doSchemaOutOpAsync(
                 ClientOp.TUPLE_UPSERT_ALL,
                 (s, w) -> ser.writeKvTuples(tx, pairs, s, w),
-                r -> null);
+                r -> null,
+                ClientTupleSerializer.getHashFunction(tx, pairs.keySet().iterator().next()));
     }
 
     /** {@inheritDoc} */
     @Override
-    public Tuple getAndPut(@Nullable Transaction tx, @NotNull Tuple key, Tuple val) {
+    public Tuple getAndPut(@Nullable Transaction tx, @NotNull Tuple key, @NotNull Tuple val) {
         return sync(getAndPutAsync(tx, key, val));
     }
 
@@ -199,7 +215,9 @@ public class ClientKeyValueBinaryView implements KeyValueView<Tuple, Tuple> {
         return tbl.doSchemaOutInOpAsync(
                 ClientOp.TUPLE_GET_AND_UPSERT,
                 (s, w) -> ser.writeKvTuple(tx, key, val, s, w, false),
-                ClientTupleSerializer::readValueTuple);
+                ClientTupleSerializer::readValueTuple,
+                null,
+                ClientTupleSerializer.getHashFunction(tx, key));
     }
 
     /**
@@ -237,7 +255,8 @@ public class ClientKeyValueBinaryView implements KeyValueView<Tuple, Tuple> {
         return tbl.doSchemaOutOpAsync(
                 ClientOp.TUPLE_INSERT,
                 (s, w) -> ser.writeKvTuple(tx, key, val, s, w, false),
-                ClientMessageUnpacker::unpackBoolean);
+                ClientMessageUnpacker::unpackBoolean,
+                ClientTupleSerializer.getHashFunction(tx, key));
     }
 
     /** {@inheritDoc} */
@@ -260,7 +279,8 @@ public class ClientKeyValueBinaryView implements KeyValueView<Tuple, Tuple> {
         return tbl.doSchemaOutOpAsync(
                 ClientOp.TUPLE_DELETE,
                 (s, w) -> ser.writeTuple(tx, key, s, w, true),
-                ClientMessageUnpacker::unpackBoolean);
+                ClientMessageUnpacker::unpackBoolean,
+                ClientTupleSerializer.getHashFunction(tx, key));
     }
 
     /** {@inheritDoc} */
@@ -272,7 +292,8 @@ public class ClientKeyValueBinaryView implements KeyValueView<Tuple, Tuple> {
         return tbl.doSchemaOutOpAsync(
                 ClientOp.TUPLE_DELETE_EXACT,
                 (s, w) -> ser.writeKvTuple(tx, key, val, s, w, false),
-                ClientMessageUnpacker::unpackBoolean);
+                ClientMessageUnpacker::unpackBoolean,
+                ClientTupleSerializer.getHashFunction(tx, key));
     }
 
     /** {@inheritDoc} */
@@ -286,11 +307,16 @@ public class ClientKeyValueBinaryView implements KeyValueView<Tuple, Tuple> {
     public @NotNull CompletableFuture<Collection<Tuple>> removeAllAsync(@Nullable Transaction tx, @NotNull Collection<Tuple> keys) {
         Objects.requireNonNull(keys);
 
+        if (keys.isEmpty()) {
+            return CompletableFuture.completedFuture(Collections.emptyList());
+        }
+
         return tbl.doSchemaOutInOpAsync(
                 ClientOp.TUPLE_DELETE_ALL,
                 (s, w) -> ser.writeTuples(tx, keys, s, w, true),
                 (s, r) -> ClientTupleSerializer.readTuples(s, r, true),
-                Collections.emptyList());
+                Collections.emptyList(),
+                ClientTupleSerializer.getHashFunction(tx, keys.iterator().next()));
     }
 
     /** {@inheritDoc} */
@@ -307,7 +333,9 @@ public class ClientKeyValueBinaryView implements KeyValueView<Tuple, Tuple> {
         return tbl.doSchemaOutInOpAsync(
                 ClientOp.TUPLE_GET_AND_DELETE,
                 (s, w) -> ser.writeTuple(tx, key, s, w, true),
-                ClientTupleSerializer::readValueTuple);
+                ClientTupleSerializer::readValueTuple,
+                null,
+                ClientTupleSerializer.getHashFunction(tx, key));
     }
 
     /**
@@ -350,7 +378,8 @@ public class ClientKeyValueBinaryView implements KeyValueView<Tuple, Tuple> {
         return tbl.doSchemaOutOpAsync(
                 ClientOp.TUPLE_REPLACE,
                 (s, w) -> ser.writeKvTuple(tx, key, val, s, w, false),
-                ClientMessageUnpacker::unpackBoolean);
+                ClientMessageUnpacker::unpackBoolean,
+                ClientTupleSerializer.getHashFunction(tx, key));
     }
 
     /** {@inheritDoc} */
@@ -364,7 +393,8 @@ public class ClientKeyValueBinaryView implements KeyValueView<Tuple, Tuple> {
                     ser.writeKvTuple(tx, key, oldVal, s, w, false);
                     ser.writeKvTuple(tx, key, newVal, s, w, true);
                 },
-                ClientMessageUnpacker::unpackBoolean);
+                ClientMessageUnpacker::unpackBoolean,
+                ClientTupleSerializer.getHashFunction(tx, key));
     }
 
     /** {@inheritDoc} */
@@ -382,7 +412,9 @@ public class ClientKeyValueBinaryView implements KeyValueView<Tuple, Tuple> {
         return tbl.doSchemaOutInOpAsync(
                 ClientOp.TUPLE_GET_AND_REPLACE,
                 (s, w) -> ser.writeKvTuple(tx, key, val, s, w, false),
-                ClientTupleSerializer::readValueTuple);
+                ClientTupleSerializer::readValueTuple,
+                null,
+                ClientTupleSerializer.getHashFunction(tx, key));
     }
 
     /**
index 224ad509c3f71d40e8815e4688923d6345a68d59..ba7fb570d260cb0e5e99d6c04ad06db5637f8ae1 100644 (file)
@@ -95,7 +95,9 @@ public class ClientKeyValueView<K, V> implements KeyValueView<K, V> {
         return tbl.doSchemaOutInOpAsync(
                 ClientOp.TUPLE_GET,
                 (s, w) -> keySer.writeRec(tx, key, s, w, TuplePart.KEY),
-                (s, r) -> valSer.readRec(s, r, TuplePart.VAL));
+                (s, r) -> valSer.readRec(s, r, TuplePart.VAL),
+                null,
+                ClientTupleSerializer.getHashFunction(tx, keySer.mapper(), key));
     }
 
     /** {@inheritDoc} */
@@ -141,7 +143,8 @@ public class ClientKeyValueView<K, V> implements KeyValueView<K, V> {
                 ClientOp.TUPLE_GET_ALL,
                 (s, w) -> keySer.writeRecs(tx, keys, s, w, TuplePart.KEY),
                 this::readGetAllResponse,
-                Collections.emptyMap());
+                Collections.emptyMap(),
+                ClientTupleSerializer.getHashFunction(tx, keySer.mapper(), keys.iterator().next()));
     }
 
     /** {@inheritDoc} */
@@ -158,7 +161,8 @@ public class ClientKeyValueView<K, V> implements KeyValueView<K, V> {
         return tbl.doSchemaOutOpAsync(
                 ClientOp.TUPLE_CONTAINS_KEY,
                 (s, w) -> keySer.writeRec(tx, key, s, w, TuplePart.KEY),
-                ClientMessageUnpacker::unpackBoolean);
+                ClientMessageUnpacker::unpackBoolean,
+                ClientTupleSerializer.getHashFunction(tx, keySer.mapper(), key));
     }
 
     /** {@inheritDoc} */
@@ -175,7 +179,8 @@ public class ClientKeyValueView<K, V> implements KeyValueView<K, V> {
         return tbl.doSchemaOutOpAsync(
                 ClientOp.TUPLE_UPSERT,
                 (s, w) -> writeKeyValue(s, w, tx, key, val),
-                r -> null);
+                r -> null,
+                ClientTupleSerializer.getHashFunction(tx, keySer.mapper(), key));
     }
 
     /** {@inheritDoc} */
@@ -203,7 +208,8 @@ public class ClientKeyValueView<K, V> implements KeyValueView<K, V> {
                         writeKeyValueRaw(s, w, e.getKey(), e.getValue());
                     }
                 },
-                r -> null);
+                r -> null,
+                ClientTupleSerializer.getHashFunction(tx, keySer.mapper(), pairs.keySet().iterator().next()));
     }
 
     /** {@inheritDoc} */
@@ -221,7 +227,9 @@ public class ClientKeyValueView<K, V> implements KeyValueView<K, V> {
         return tbl.doSchemaOutInOpAsync(
                 ClientOp.TUPLE_GET_AND_UPSERT,
                 (s, w) -> writeKeyValue(s, w, tx, key, val),
-                (s, r) -> valSer.readRec(s, r, TuplePart.VAL));
+                (s, r) -> valSer.readRec(s, r, TuplePart.VAL),
+                null,
+                ClientTupleSerializer.getHashFunction(tx, keySer.mapper(), key));
     }
 
     /** {@inheritDoc} */
@@ -250,7 +258,8 @@ public class ClientKeyValueView<K, V> implements KeyValueView<K, V> {
         return tbl.doSchemaOutOpAsync(
                 ClientOp.TUPLE_INSERT,
                 (s, w) -> writeKeyValue(s, w, tx, key, val),
-                ClientMessageUnpacker::unpackBoolean);
+                ClientMessageUnpacker::unpackBoolean,
+                ClientTupleSerializer.getHashFunction(tx, keySer.mapper(), key));
     }
 
     /** {@inheritDoc} */
@@ -273,7 +282,8 @@ public class ClientKeyValueView<K, V> implements KeyValueView<K, V> {
         return tbl.doSchemaOutOpAsync(
                 ClientOp.TUPLE_DELETE,
                 (s, w) -> keySer.writeRec(tx, key, s, w, TuplePart.KEY),
-                ClientMessageUnpacker::unpackBoolean);
+                ClientMessageUnpacker::unpackBoolean,
+                ClientTupleSerializer.getHashFunction(tx, keySer.mapper(), key));
     }
 
     /** {@inheritDoc} */
@@ -284,7 +294,8 @@ public class ClientKeyValueView<K, V> implements KeyValueView<K, V> {
         return tbl.doSchemaOutOpAsync(
                 ClientOp.TUPLE_DELETE_EXACT,
                 (s, w) -> writeKeyValue(s, w, tx, key, val),
-                ClientMessageUnpacker::unpackBoolean);
+                ClientMessageUnpacker::unpackBoolean,
+                ClientTupleSerializer.getHashFunction(tx, keySer.mapper(), key));
     }
 
     /** {@inheritDoc} */
@@ -306,7 +317,8 @@ public class ClientKeyValueView<K, V> implements KeyValueView<K, V> {
                 ClientOp.TUPLE_DELETE_ALL,
                 (s, w) -> keySer.writeRecs(tx, keys, s, w, TuplePart.KEY),
                 (s, r) -> keySer.readRecs(s, r, false, TuplePart.KEY),
-                Collections.emptyList());
+                Collections.emptyList(),
+                ClientTupleSerializer.getHashFunction(tx, keySer.mapper(), keys.iterator().next()));
     }
 
     /** {@inheritDoc} */
@@ -323,7 +335,9 @@ public class ClientKeyValueView<K, V> implements KeyValueView<K, V> {
         return tbl.doSchemaOutInOpAsync(
                 ClientOp.TUPLE_GET_AND_DELETE,
                 (s, w) -> keySer.writeRec(tx, key, s, w, TuplePart.KEY),
-                (s, r) -> valSer.readRec(s, r, TuplePart.VAL));
+                (s, r) -> valSer.readRec(s, r, TuplePart.VAL),
+                null,
+                ClientTupleSerializer.getHashFunction(tx, keySer.mapper(), key));
     }
 
     /** {@inheritDoc} */
@@ -360,7 +374,8 @@ public class ClientKeyValueView<K, V> implements KeyValueView<K, V> {
         return tbl.doSchemaOutOpAsync(
                 ClientOp.TUPLE_REPLACE,
                 (s, w) -> writeKeyValue(s, w, tx, key, val),
-                ClientMessageUnpacker::unpackBoolean);
+                ClientMessageUnpacker::unpackBoolean,
+                ClientTupleSerializer.getHashFunction(tx, keySer.mapper(), key));
     }
 
     /** {@inheritDoc} */
@@ -375,7 +390,8 @@ public class ClientKeyValueView<K, V> implements KeyValueView<K, V> {
                     writeKeyValueRaw(s, w, key, oldVal);
                     writeKeyValueRaw(s, w, key, newVal);
                 },
-                ClientMessageUnpacker::unpackBoolean);
+                ClientMessageUnpacker::unpackBoolean,
+                ClientTupleSerializer.getHashFunction(tx, keySer.mapper(), key));
     }
 
     /** {@inheritDoc} */
@@ -393,7 +409,9 @@ public class ClientKeyValueView<K, V> implements KeyValueView<K, V> {
         return tbl.doSchemaOutInOpAsync(
                 ClientOp.TUPLE_GET_AND_REPLACE,
                 (s, w) -> writeKeyValue(s, w, tx, key, val),
-                (s, r) -> valSer.readRec(s, r, TuplePart.VAL));
+                (s, r) -> valSer.readRec(s, r, TuplePart.VAL),
+                null,
+                ClientTupleSerializer.getHashFunction(tx, keySer.mapper(), key));
     }
 
     /** {@inheritDoc} */
index 9751119b7f34c0da5f8b667c4bd461da0b683689..7847b23cd3a1b897050609a007001e1c67700cf3 100644 (file)
@@ -25,10 +25,8 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.client.proto.ClientOp;
-import org.apache.ignite.internal.util.HashCalculator;
 import org.apache.ignite.table.InvokeProcessor;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Tuple;
@@ -74,7 +72,7 @@ public class ClientRecordBinaryView implements RecordView<Tuple> {
                 (s, w) -> ser.writeTuple(tx, keyRec, s, w, true),
                 (s, r) -> ClientTupleSerializer.readValueTuple(s, r, keyRec),
                 null,
-                getHashFunction(tx, keyRec));
+                ClientTupleSerializer.getHashFunction(tx, keyRec));
     }
 
     /** {@inheritDoc} */
@@ -88,11 +86,16 @@ public class ClientRecordBinaryView implements RecordView<Tuple> {
     public @NotNull CompletableFuture<Collection<Tuple>> getAllAsync(@Nullable Transaction tx, @NotNull Collection<Tuple> keyRecs) {
         Objects.requireNonNull(keyRecs);
 
+        if (keyRecs.isEmpty()) {
+            return CompletableFuture.completedFuture(Collections.emptyList());
+        }
+
         return tbl.doSchemaOutInOpAsync(
                 ClientOp.TUPLE_GET_ALL,
                 (s, w) -> ser.writeTuples(tx, keyRecs, s, w, true),
                 ClientTupleSerializer::readTuplesNullable,
-                Collections.emptyList());
+                Collections.emptyList(),
+                ClientTupleSerializer.getHashFunction(tx, keyRecs.iterator().next()));
     }
 
     /** {@inheritDoc} */
@@ -109,7 +112,8 @@ public class ClientRecordBinaryView implements RecordView<Tuple> {
         return tbl.doSchemaOutOpAsync(
                 ClientOp.TUPLE_UPSERT,
                 (s, w) -> ser.writeTuple(tx, rec, s, w),
-                r -> null);
+                r -> null,
+                ClientTupleSerializer.getHashFunction(tx, rec));
     }
 
     /** {@inheritDoc} */
@@ -123,10 +127,15 @@ public class ClientRecordBinaryView implements RecordView<Tuple> {
     public @NotNull CompletableFuture<Void> upsertAllAsync(@Nullable Transaction tx, @NotNull Collection<Tuple> recs) {
         Objects.requireNonNull(recs);
 
+        if (recs.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
         return tbl.doSchemaOutOpAsync(
                 ClientOp.TUPLE_UPSERT_ALL,
                 (s, w) -> ser.writeTuples(tx, recs, s, w, false),
-                r -> null);
+                r -> null,
+                ClientTupleSerializer.getHashFunction(tx, recs.iterator().next()));
     }
 
     /** {@inheritDoc} */
@@ -143,7 +152,9 @@ public class ClientRecordBinaryView implements RecordView<Tuple> {
         return tbl.doSchemaOutInOpAsync(
                 ClientOp.TUPLE_GET_AND_UPSERT,
                 (s, w) -> ser.writeTuple(tx, rec, s, w, false),
-                (s, r) -> ClientTupleSerializer.readValueTuple(s, r, rec));
+                (s, r) -> ClientTupleSerializer.readValueTuple(s, r, rec),
+                null,
+                ClientTupleSerializer.getHashFunction(tx, rec));
     }
 
     /** {@inheritDoc} */
@@ -160,7 +171,8 @@ public class ClientRecordBinaryView implements RecordView<Tuple> {
         return tbl.doSchemaOutOpAsync(
                 ClientOp.TUPLE_INSERT,
                 (s, w) -> ser.writeTuple(tx, rec, s, w, false),
-                ClientMessageUnpacker::unpackBoolean);
+                ClientMessageUnpacker::unpackBoolean,
+                ClientTupleSerializer.getHashFunction(tx, rec));
     }
 
     /** {@inheritDoc} */
@@ -174,11 +186,16 @@ public class ClientRecordBinaryView implements RecordView<Tuple> {
     public @NotNull CompletableFuture<Collection<Tuple>> insertAllAsync(@Nullable Transaction tx, @NotNull Collection<Tuple> recs) {
         Objects.requireNonNull(recs);
 
+        if (recs.isEmpty()) {
+            return CompletableFuture.completedFuture(Collections.emptyList());
+        }
+
         return tbl.doSchemaOutInOpAsync(
                 ClientOp.TUPLE_INSERT_ALL,
                 (s, w) -> ser.writeTuples(tx, recs, s, w, false),
                 ClientTupleSerializer::readTuples,
-                Collections.emptyList());
+                Collections.emptyList(),
+                ClientTupleSerializer.getHashFunction(tx, recs.iterator().next()));
     }
 
     /** {@inheritDoc} */
@@ -201,7 +218,8 @@ public class ClientRecordBinaryView implements RecordView<Tuple> {
         return tbl.doSchemaOutOpAsync(
                 ClientOp.TUPLE_REPLACE,
                 (s, w) -> ser.writeTuple(tx, rec, s, w, false),
-                ClientMessageUnpacker::unpackBoolean);
+                ClientMessageUnpacker::unpackBoolean,
+                ClientTupleSerializer.getHashFunction(tx, rec));
     }
 
     /** {@inheritDoc} */
@@ -216,7 +234,8 @@ public class ClientRecordBinaryView implements RecordView<Tuple> {
                     ser.writeTuple(tx, oldRec, s, w, false, false);
                     ser.writeTuple(tx, newRec, s, w, false, true);
                 },
-                ClientMessageUnpacker::unpackBoolean);
+                ClientMessageUnpacker::unpackBoolean,
+                ClientTupleSerializer.getHashFunction(tx, oldRec));
     }
 
     /** {@inheritDoc} */
@@ -233,7 +252,9 @@ public class ClientRecordBinaryView implements RecordView<Tuple> {
         return tbl.doSchemaOutInOpAsync(
                 ClientOp.TUPLE_GET_AND_REPLACE,
                 (s, w) -> ser.writeTuple(tx, rec, s, w, false),
-                (s, r) -> ClientTupleSerializer.readValueTuple(s, r, rec));
+                (s, r) -> ClientTupleSerializer.readValueTuple(s, r, rec),
+                null,
+                ClientTupleSerializer.getHashFunction(tx, rec));
     }
 
     /** {@inheritDoc} */
@@ -250,7 +271,8 @@ public class ClientRecordBinaryView implements RecordView<Tuple> {
         return tbl.doSchemaOutOpAsync(
                 ClientOp.TUPLE_DELETE,
                 (s, w) -> ser.writeTuple(tx, keyRec, s, w, true),
-                ClientMessageUnpacker::unpackBoolean);
+                ClientMessageUnpacker::unpackBoolean,
+                ClientTupleSerializer.getHashFunction(tx, keyRec));
     }
 
     /** {@inheritDoc} */
@@ -267,7 +289,8 @@ public class ClientRecordBinaryView implements RecordView<Tuple> {
         return tbl.doSchemaOutOpAsync(
                 ClientOp.TUPLE_DELETE_EXACT,
                 (s, w) -> ser.writeTuple(tx, rec, s, w, false),
-                ClientMessageUnpacker::unpackBoolean);
+                ClientMessageUnpacker::unpackBoolean,
+                ClientTupleSerializer.getHashFunction(tx, rec));
     }
 
     /** {@inheritDoc} */
@@ -284,7 +307,9 @@ public class ClientRecordBinaryView implements RecordView<Tuple> {
         return tbl.doSchemaOutInOpAsync(
                 ClientOp.TUPLE_GET_AND_DELETE,
                 (s, w) -> ser.writeTuple(tx, keyRec, s, w, true),
-                (s, r) -> ClientTupleSerializer.readValueTuple(s, r, keyRec));
+                (s, r) -> ClientTupleSerializer.readValueTuple(s, r, keyRec),
+                null,
+                ClientTupleSerializer.getHashFunction(tx, keyRec));
     }
 
     /** {@inheritDoc} */
@@ -298,11 +323,16 @@ public class ClientRecordBinaryView implements RecordView<Tuple> {
     public @NotNull CompletableFuture<Collection<Tuple>> deleteAllAsync(@Nullable Transaction tx, @NotNull Collection<Tuple> keyRecs) {
         Objects.requireNonNull(keyRecs);
 
+        if (keyRecs.isEmpty()) {
+            return CompletableFuture.completedFuture(Collections.emptyList());
+        }
+
         return tbl.doSchemaOutInOpAsync(
                 ClientOp.TUPLE_DELETE_ALL,
                 (s, w) -> ser.writeTuples(tx, keyRecs, s, w, true),
                 (s, r) -> ClientTupleSerializer.readTuples(s, r, true),
-                Collections.emptyList());
+                Collections.emptyList(),
+                ClientTupleSerializer.getHashFunction(tx, keyRecs.iterator().next()));
     }
 
     /** {@inheritDoc} */
@@ -316,11 +346,16 @@ public class ClientRecordBinaryView implements RecordView<Tuple> {
     public @NotNull CompletableFuture<Collection<Tuple>> deleteAllExactAsync(@Nullable Transaction tx, @NotNull Collection<Tuple> recs) {
         Objects.requireNonNull(recs);
 
+        if (recs.isEmpty()) {
+            return CompletableFuture.completedFuture(Collections.emptyList());
+        }
+
         return tbl.doSchemaOutInOpAsync(
                 ClientOp.TUPLE_DELETE_ALL_EXACT,
                 (s, w) -> ser.writeTuples(tx, recs, s, w, false),
                 ClientTupleSerializer::readTuples,
-                Collections.emptyList());
+                Collections.emptyList(),
+                ClientTupleSerializer.getHashFunction(tx, recs.iterator().next()));
     }
 
     /** {@inheritDoc} */
@@ -358,21 +393,4 @@ public class ClientRecordBinaryView implements RecordView<Tuple> {
     ) {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
-
-    private Integer getColocationHash(ClientSchema schema, Tuple rec) {
-        var hashCalc = new HashCalculator();
-
-        for (ClientColumn col : schema.colocationColumns()) {
-            Object value = rec.valueOrDefault(col.name(), null);
-            hashCalc.append(value);
-        }
-
-        return hashCalc.hash();
-    }
-
-    @Nullable
-    private Function<ClientSchema, Integer> getHashFunction(@Nullable Transaction tx, @NotNull Tuple rec) {
-        // Disable partition awareness when transaction is used: tx belongs to a default connection.
-        return tx != null ? null : schema -> getColocationHash(schema, rec);
-    }
 }
index 35a12fc18116903d4b09f5184be32ec181d11721..3619a933f3b88a2d91603b6f92e2b7e1e946fa74 100644 (file)
@@ -70,7 +70,9 @@ public class ClientRecordView<R> implements RecordView<R> {
         return tbl.doSchemaOutInOpAsync(
                 ClientOp.TUPLE_GET,
                 (s, w) -> ser.writeRec(tx, keyRec, s, w, TuplePart.KEY),
-                (s, r) -> ser.readValRec(keyRec, s, r));
+                (s, r) -> ser.readValRec(keyRec, s, r),
+                null,
+                ClientTupleSerializer.getHashFunction(tx, ser.mapper(), keyRec));
     }
 
     /** {@inheritDoc} */
@@ -84,11 +86,16 @@ public class ClientRecordView<R> implements RecordView<R> {
     public @NotNull CompletableFuture<Collection<R>> getAllAsync(@Nullable Transaction tx, @NotNull Collection<R> keyRecs) {
         Objects.requireNonNull(keyRecs);
 
+        if (keyRecs.isEmpty()) {
+            return CompletableFuture.completedFuture(Collections.emptyList());
+        }
+
         return tbl.doSchemaOutInOpAsync(
                 ClientOp.TUPLE_GET_ALL,
                 (s, w) -> ser.writeRecs(tx, keyRecs, s, w, TuplePart.KEY),
                 (s, r) -> ser.readRecs(s, r, true, TuplePart.KEY_AND_VAL),
-                Collections.emptyList());
+                Collections.emptyList(),
+                ClientTupleSerializer.getHashFunction(tx, ser.mapper(), keyRecs.iterator().next()));
     }
 
     /** {@inheritDoc} */
@@ -105,7 +112,8 @@ public class ClientRecordView<R> implements RecordView<R> {
         return tbl.doSchemaOutOpAsync(
                 ClientOp.TUPLE_UPSERT,
                 (s, w) -> ser.writeRec(tx, rec, s, w, TuplePart.KEY_AND_VAL),
-                r -> null);
+                r -> null,
+                ClientTupleSerializer.getHashFunction(tx, ser.mapper(), rec));
     }
 
     /** {@inheritDoc} */
@@ -119,10 +127,15 @@ public class ClientRecordView<R> implements RecordView<R> {
     public @NotNull CompletableFuture<Void> upsertAllAsync(@Nullable Transaction tx, @NotNull Collection<R> recs) {
         Objects.requireNonNull(recs);
 
+        if (recs.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
         return tbl.doSchemaOutOpAsync(
                 ClientOp.TUPLE_UPSERT_ALL,
                 (s, w) -> ser.writeRecs(tx, recs, s, w, TuplePart.KEY_AND_VAL),
-                r -> null);
+                r -> null,
+                ClientTupleSerializer.getHashFunction(tx, ser.mapper(), recs.iterator().next()));
     }
 
     /** {@inheritDoc} */
@@ -139,7 +152,9 @@ public class ClientRecordView<R> implements RecordView<R> {
         return tbl.doSchemaOutInOpAsync(
                 ClientOp.TUPLE_GET_AND_UPSERT,
                 (s, w) -> ser.writeRec(tx, rec, s, w, TuplePart.KEY_AND_VAL),
-                (s, r) -> ser.readValRec(rec, s, r));
+                (s, r) -> ser.readValRec(rec, s, r),
+                null,
+                ClientTupleSerializer.getHashFunction(tx, ser.mapper(), rec));
     }
 
     /** {@inheritDoc} */
@@ -156,7 +171,8 @@ public class ClientRecordView<R> implements RecordView<R> {
         return tbl.doSchemaOutOpAsync(
                 ClientOp.TUPLE_INSERT,
                 (s, w) -> ser.writeRec(tx, rec, s, w, TuplePart.KEY_AND_VAL),
-                ClientMessageUnpacker::unpackBoolean);
+                ClientMessageUnpacker::unpackBoolean,
+                ClientTupleSerializer.getHashFunction(tx, ser.mapper(), rec));
     }
 
     /** {@inheritDoc} */
@@ -170,11 +186,16 @@ public class ClientRecordView<R> implements RecordView<R> {
     public @NotNull CompletableFuture<Collection<R>> insertAllAsync(@Nullable Transaction tx, @NotNull Collection<R> recs) {
         Objects.requireNonNull(recs);
 
+        if (recs.isEmpty()) {
+            return CompletableFuture.completedFuture(Collections.emptyList());
+        }
+
         return tbl.doSchemaOutInOpAsync(
                 ClientOp.TUPLE_INSERT_ALL,
                 (s, w) -> ser.writeRecs(tx, recs, s, w, TuplePart.KEY_AND_VAL),
                 (s, r) -> ser.readRecs(s, r, false, TuplePart.KEY_AND_VAL),
-                Collections.emptyList());
+                Collections.emptyList(),
+                ClientTupleSerializer.getHashFunction(tx, ser.mapper(), recs.iterator().next()));
     }
 
     /** {@inheritDoc} */
@@ -197,7 +218,8 @@ public class ClientRecordView<R> implements RecordView<R> {
         return tbl.doSchemaOutOpAsync(
                 ClientOp.TUPLE_REPLACE,
                 (s, w) -> ser.writeRec(tx, rec, s, w, TuplePart.KEY_AND_VAL),
-                ClientMessageUnpacker::unpackBoolean);
+                ClientMessageUnpacker::unpackBoolean,
+                ClientTupleSerializer.getHashFunction(tx, ser.mapper(), rec));
     }
 
     /** {@inheritDoc} */
@@ -209,7 +231,8 @@ public class ClientRecordView<R> implements RecordView<R> {
         return tbl.doSchemaOutOpAsync(
                 ClientOp.TUPLE_REPLACE_EXACT,
                 (s, w) -> ser.writeRecs(tx, oldRec, newRec, s, w, TuplePart.KEY_AND_VAL),
-                ClientMessageUnpacker::unpackBoolean);
+                ClientMessageUnpacker::unpackBoolean,
+                ClientTupleSerializer.getHashFunction(tx, ser.mapper(), oldRec));
     }
 
     /** {@inheritDoc} */
@@ -226,7 +249,9 @@ public class ClientRecordView<R> implements RecordView<R> {
         return tbl.doSchemaOutInOpAsync(
                 ClientOp.TUPLE_GET_AND_REPLACE,
                 (s, w) -> ser.writeRec(tx, rec, s, w, TuplePart.KEY_AND_VAL),
-                (s, r) -> ser.readValRec(rec, s, r));
+                (s, r) -> ser.readValRec(rec, s, r),
+                null,
+                ClientTupleSerializer.getHashFunction(tx, ser.mapper(), rec));
     }
 
     /** {@inheritDoc} */
@@ -243,7 +268,8 @@ public class ClientRecordView<R> implements RecordView<R> {
         return tbl.doSchemaOutOpAsync(
                 ClientOp.TUPLE_DELETE,
                 (s, w) -> ser.writeRec(tx, keyRec, s, w, TuplePart.KEY),
-                ClientMessageUnpacker::unpackBoolean);
+                ClientMessageUnpacker::unpackBoolean,
+                ClientTupleSerializer.getHashFunction(tx, ser.mapper(), keyRec));
     }
 
     /** {@inheritDoc} */
@@ -260,7 +286,8 @@ public class ClientRecordView<R> implements RecordView<R> {
         return tbl.doSchemaOutOpAsync(
                 ClientOp.TUPLE_DELETE_EXACT,
                 (s, w) -> ser.writeRec(tx, rec, s, w, TuplePart.KEY_AND_VAL),
-                ClientMessageUnpacker::unpackBoolean);
+                ClientMessageUnpacker::unpackBoolean,
+                ClientTupleSerializer.getHashFunction(tx, ser.mapper(), rec));
     }
 
     /** {@inheritDoc} */
@@ -277,7 +304,9 @@ public class ClientRecordView<R> implements RecordView<R> {
         return tbl.doSchemaOutInOpAsync(
                 ClientOp.TUPLE_GET_AND_DELETE,
                 (s, w) -> ser.writeRec(tx, keyRec, s, w, TuplePart.KEY),
-                (s, r) -> ser.readValRec(keyRec, s, r));
+                (s, r) -> ser.readValRec(keyRec, s, r),
+                null,
+                ClientTupleSerializer.getHashFunction(tx, ser.mapper(), keyRec));
     }
 
     /** {@inheritDoc} */
@@ -291,11 +320,16 @@ public class ClientRecordView<R> implements RecordView<R> {
     public @NotNull CompletableFuture<Collection<R>> deleteAllAsync(@Nullable Transaction tx, @NotNull Collection<R> keyRecs) {
         Objects.requireNonNull(keyRecs);
 
+        if (keyRecs.isEmpty()) {
+            return CompletableFuture.completedFuture(Collections.emptyList());
+        }
+
         return tbl.doSchemaOutInOpAsync(
                 ClientOp.TUPLE_DELETE_ALL,
                 (s, w) -> ser.writeRecs(tx, keyRecs, s, w, TuplePart.KEY),
                 (s, r) -> ser.readRecs(s, r, false, TuplePart.KEY),
-                Collections.emptyList());
+                Collections.emptyList(),
+                ClientTupleSerializer.getHashFunction(tx, ser.mapper(), keyRecs.iterator().next()));
     }
 
     /** {@inheritDoc} */
@@ -309,11 +343,16 @@ public class ClientRecordView<R> implements RecordView<R> {
     public @NotNull CompletableFuture<Collection<R>> deleteAllExactAsync(@Nullable Transaction tx, @NotNull Collection<R> recs) {
         Objects.requireNonNull(recs);
 
+        if (recs.isEmpty()) {
+            return CompletableFuture.completedFuture(Collections.emptyList());
+        }
+
         return tbl.doSchemaOutInOpAsync(
                 ClientOp.TUPLE_DELETE_ALL_EXACT,
                 (s, w) -> ser.writeRecs(tx, recs, s, w, TuplePart.KEY_AND_VAL),
                 (s, r) -> ser.readRecs(s, r, false, TuplePart.KEY_AND_VAL),
-                Collections.emptyList());
+                Collections.emptyList(),
+                ClientTupleSerializer.getHashFunction(tx, ser.mapper(), recs.iterator().next()));
     }
 
     /** {@inheritDoc} */
index 68ac843a7a70ff20190a06dca8783a4f9b84e4da..d7332807be0d8b3b539dcfe9b2bb5df61233c61c 100644 (file)
@@ -283,18 +283,8 @@ public class ClientTable implements Table {
 
         return CompletableFuture.allOf(schemaFut, partitionsFut)
                 .thenCompose(v -> {
-                    List<String> partitions = partitionsFut.getNow(null);
                     ClientSchema schema = schemaFut.getNow(null);
-
-                    String preferredNodeId = null;
-
-                    if (partitions != null && partitions.size() > 0 && hashFunction != null) {
-                        Integer hash = hashFunction.apply(schema);
-
-                        if (hash != null) {
-                            preferredNodeId = partitions.get(Math.abs(hash % partitions.size()));
-                        }
-                    }
+                    String preferredNodeId = getPreferredNodeId(hashFunction, partitionsFut.getNow(null), schema);
 
                     return ch.serviceAsync(opCode,
                             w -> writer.accept(schema, w),
@@ -325,6 +315,30 @@ public class ClientTable implements Table {
                                 r -> reader.apply(r.in())));
     }
 
+    <T> CompletableFuture<T> doSchemaOutOpAsync(
+            int opCode,
+            BiConsumer<ClientSchema, PayloadOutputChannel> writer,
+            Function<ClientMessageUnpacker, T> reader,
+            Function<ClientSchema, Integer> hashFunction) {
+
+        CompletableFuture<ClientSchema> schemaFut = getLatestSchema();
+        CompletableFuture<List<String>> partitionsFut = hashFunction == null
+                ? CompletableFuture.completedFuture(null)
+                : getPartitionAssignment();
+
+        return CompletableFuture.allOf(schemaFut, partitionsFut)
+                .thenCompose(v -> {
+                    ClientSchema schema = schemaFut.getNow(null);
+                    String preferredNodeId = getPreferredNodeId(hashFunction, partitionsFut.getNow(null), schema);
+
+                    return ch.serviceAsync(opCode,
+                            w -> writer.accept(schema, w),
+                            r -> reader.apply(r.in()),
+                            null,
+                            preferredNodeId);
+                });
+    }
+
     private <T> Object readSchemaAndReadData(
             ClientSchema knownSchema,
             ClientMessageUnpacker in,
@@ -403,4 +417,19 @@ public class ClientTable implements Table {
                     return res;
                 });
     }
+
+    @Nullable
+    private static String getPreferredNodeId(Function<ClientSchema, Integer> hashFunction, List<String> partitions, ClientSchema schema) {
+        if (partitions == null || partitions.isEmpty() || hashFunction == null) {
+            return null;
+        }
+
+        Integer hash = hashFunction.apply(schema);
+
+        if (hash == null) {
+            return null;
+        }
+
+        return partitions.get(Math.abs(hash % partitions.size()));
+    }
 }
index 0bf002c89f09010e6dfc43568ef0bdfb801cf9cc..43add6b8602f71cf740f4bea0ab638a14df8d840 100644 (file)
@@ -33,15 +33,19 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.function.Function;
 import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
 import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
 import org.apache.ignite.internal.client.PayloadOutputChannel;
 import org.apache.ignite.internal.client.proto.ClientBinaryTupleUtils;
 import org.apache.ignite.internal.client.proto.ClientDataType;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.client.proto.TuplePart;
+import org.apache.ignite.internal.util.HashCalculator;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -439,4 +443,40 @@ public class ClientTupleSerializer {
             throw new IgniteException(PROTOCOL_ERR, "Incorrect value type for column '" + col.name() + "': " + e.getMessage(), e);
         }
     }
+
+    @Nullable
+    static Function<ClientSchema, Integer> getHashFunction(@Nullable Transaction tx, @NotNull Tuple rec) {
+        // Disable partition awareness when transaction is used: tx belongs to a default connection.
+        return tx != null ? null : schema -> getColocationHash(schema, rec);
+    }
+
+    @Nullable
+    static Function<ClientSchema, Integer> getHashFunction(@Nullable Transaction tx, Mapper<?> mapper, @NotNull Object rec) {
+        // Disable partition awareness when transaction is used: tx belongs to a default connection.
+        return tx != null ? null : schema -> getColocationHash(schema, mapper, rec);
+    }
+
+    private static Integer getColocationHash(ClientSchema schema, Tuple rec) {
+        var hashCalc = new HashCalculator();
+
+        for (ClientColumn col : schema.colocationColumns()) {
+            Object value = rec.valueOrDefault(col.name(), null);
+            hashCalc.append(value);
+        }
+
+        return hashCalc.hash();
+    }
+
+    private static Integer getColocationHash(ClientSchema schema, Mapper<?> mapper, Object rec) {
+        // Colocation columns are always part of the key - https://cwiki.apache.org/confluence/display/IGNITE/IEP-86%3A+Colocation+Key.
+        var hashCalc = new HashCalculator();
+        var marsh = schema.getMarshaller(mapper, TuplePart.KEY);
+
+        for (ClientColumn col : schema.colocationColumns()) {
+            Object value = marsh.value(rec, col.schemaIndex());
+            hashCalc.append(value);
+        }
+
+        return hashCalc.hash();
+    }
 }
index 371a63ccc60962ea49d2d5872b88ac6fe22337e1..16f9c9f9b20773e82d6f52ca36ef88411e3d03a3 100644 (file)
@@ -21,6 +21,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import io.netty.util.ResourceLeakDetector;
 import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.function.Consumer;
 import org.apache.ignite.Ignite;
@@ -28,9 +30,11 @@ import org.apache.ignite.client.fakes.FakeIgnite;
 import org.apache.ignite.client.fakes.FakeIgniteTables;
 import org.apache.ignite.client.fakes.FakeInternalTable;
 import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
@@ -93,7 +97,7 @@ public class PartitionAwarenessTest extends AbstractClientTest {
     }
 
     @Test
-    public void testGetRoutesRequestToPrimaryNode() {
+    public void testGetTupleRoutesRequestToPrimaryNode() {
         RecordView<Tuple> recordView = defaultTable().recordView();
 
         assertOpOnNode("server-1", "get", x -> recordView.get(null, Tuple.create().set("ID", 0L)));
@@ -102,6 +106,37 @@ public class PartitionAwarenessTest extends AbstractClientTest {
         assertOpOnNode("server-2", "get", x -> recordView.get(null, Tuple.create().set("ID", 3L)));
     }
 
+    @Test
+    public void testGetRecordRoutesRequestToPrimaryNode() {
+        RecordView<AbstractClientTableTest.PersonPojo> pojoView = defaultTable().recordView(
+                Mapper.of(AbstractClientTableTest.PersonPojo.class));
+
+        assertOpOnNode("server-1", "get", x -> pojoView.get(null, new AbstractClientTableTest.PersonPojo(0L)));
+        assertOpOnNode("server-2", "get", x -> pojoView.get(null, new AbstractClientTableTest.PersonPojo(1L)));
+        assertOpOnNode("server-1", "get", x -> pojoView.get(null, new AbstractClientTableTest.PersonPojo(2L)));
+        assertOpOnNode("server-2", "get", x -> pojoView.get(null, new AbstractClientTableTest.PersonPojo(3L)));
+    }
+
+    @Test
+    public void testGetKeyValueRoutesRequestToPrimaryNode() {
+        KeyValueView<Long, String> kvView = defaultTable().keyValueView(Mapper.of(Long.class), Mapper.of(String.class));
+
+        assertOpOnNode("server-1", "get", x -> kvView.get(null, 0L));
+        assertOpOnNode("server-2", "get", x -> kvView.get(null, 1L));
+        assertOpOnNode("server-1", "get", x -> kvView.get(null, 2L));
+        assertOpOnNode("server-2", "get", x -> kvView.get(null, 3L));
+    }
+
+    @Test
+    public void testGetKeyValueBinaryRoutesRequestToPrimaryNode() {
+        KeyValueView<Tuple, Tuple> kvView = defaultTable().keyValueView();
+
+        assertOpOnNode("server-1", "get", x -> kvView.get(null, Tuple.create().set("ID", 0L)));
+        assertOpOnNode("server-2", "get", x -> kvView.get(null, Tuple.create().set("ID", 1L)));
+        assertOpOnNode("server-1", "get", x -> kvView.get(null, Tuple.create().set("ID", 2L)));
+        assertOpOnNode("server-2", "get", x -> kvView.get(null, Tuple.create().set("ID", 3L)));
+    }
+
     @Test
     public void testNonNullTxDisablesPartitionAwareness() {
         RecordView<Tuple> recordView = defaultTable().recordView();
@@ -170,22 +205,210 @@ public class PartitionAwarenessTest extends AbstractClientTest {
 
     @Test
     public void testAllRecordViewOperations() {
-        // TODO IGNITE-17739 Add Partition Awareness to all table APIs
+        RecordView<AbstractClientTableTest.PersonPojo> pojoView = defaultTable().recordView(
+                Mapper.of(AbstractClientTableTest.PersonPojo.class));
+
+        var t1 = new AbstractClientTableTest.PersonPojo(0L);
+        var t2 = new AbstractClientTableTest.PersonPojo(1L);
+
+        assertOpOnNode("server-1", "insert", x -> pojoView.insert(null, t1));
+        assertOpOnNode("server-2", "insert", x -> pojoView.insert(null, t2));
+
+        assertOpOnNode("server-1", "insertAll", x -> pojoView.insertAll(null, List.of(t1)));
+        assertOpOnNode("server-2", "insertAll", x -> pojoView.insertAll(null, List.of(t2)));
+
+        assertOpOnNode("server-1", "upsert", x -> pojoView.upsert(null, t1));
+        assertOpOnNode("server-2", "upsert", x -> pojoView.upsert(null, t2));
+
+        assertOpOnNode("server-1", "upsertAll", x -> pojoView.upsertAll(null, List.of(t1)));
+        assertOpOnNode("server-2", "upsertAll", x -> pojoView.upsertAll(null, List.of(t2)));
+
+        assertOpOnNode("server-1", "get", x -> pojoView.get(null, t1));
+        assertOpOnNode("server-2", "get", x -> pojoView.get(null, t2));
+
+        assertOpOnNode("server-1", "getAll", x -> pojoView.getAll(null, List.of(t1)));
+        assertOpOnNode("server-2", "getAll", x -> pojoView.getAll(null, List.of(t2)));
+
+        assertOpOnNode("server-1", "getAndUpsert", x -> pojoView.getAndUpsert(null, t1));
+        assertOpOnNode("server-2", "getAndUpsert", x -> pojoView.getAndUpsert(null, t2));
+
+        assertOpOnNode("server-1", "getAndReplace", x -> pojoView.getAndReplace(null, t1));
+        assertOpOnNode("server-2", "getAndReplace", x -> pojoView.getAndReplace(null, t2));
+
+        assertOpOnNode("server-1", "getAndDelete", x -> pojoView.getAndDelete(null, t1));
+        assertOpOnNode("server-2", "getAndDelete", x -> pojoView.getAndDelete(null, t2));
+
+        assertOpOnNode("server-1", "replace", x -> pojoView.replace(null, t1));
+        assertOpOnNode("server-2", "replace", x -> pojoView.replace(null, t2));
+
+        assertOpOnNode("server-1", "replace", x -> pojoView.replace(null, t1, t1));
+        assertOpOnNode("server-2", "replace", x -> pojoView.replace(null, t2, t2));
+
+        assertOpOnNode("server-1", "delete", x -> pojoView.delete(null, t1));
+        assertOpOnNode("server-2", "delete", x -> pojoView.delete(null, t2));
+
+        assertOpOnNode("server-1", "deleteExact", x -> pojoView.deleteExact(null, t1));
+        assertOpOnNode("server-2", "deleteExact", x -> pojoView.deleteExact(null, t2));
+
+        assertOpOnNode("server-1", "deleteAll", x -> pojoView.deleteAll(null, List.of(t1)));
+        assertOpOnNode("server-2", "deleteAll", x -> pojoView.deleteAll(null, List.of(t2)));
+
+        assertOpOnNode("server-1", "deleteAllExact", x -> pojoView.deleteAllExact(null, List.of(t1)));
+        assertOpOnNode("server-2", "deleteAllExact", x -> pojoView.deleteAllExact(null, List.of(t2)));
     }
 
     @Test
     public void testAllRecordBinaryViewOperations() {
-        // TODO IGNITE-17739 Add Partition Awareness to all table APIs
+        RecordView<Tuple> recordView = defaultTable().recordView();
+
+        Tuple t1 = Tuple.create().set("ID", 0L);
+        Tuple t2 = Tuple.create().set("ID", 1L);
+
+        assertOpOnNode("server-1", "insert", x -> recordView.insert(null, t1));
+        assertOpOnNode("server-2", "insert", x -> recordView.insert(null, t2));
+
+        assertOpOnNode("server-1", "insertAll", x -> recordView.insertAll(null, List.of(t1)));
+        assertOpOnNode("server-2", "insertAll", x -> recordView.insertAll(null, List.of(t2)));
+
+        assertOpOnNode("server-1", "upsert", x -> recordView.upsert(null, t1));
+        assertOpOnNode("server-2", "upsert", x -> recordView.upsert(null, t2));
+
+        assertOpOnNode("server-1", "upsertAll", x -> recordView.upsertAll(null, List.of(t1)));
+        assertOpOnNode("server-2", "upsertAll", x -> recordView.upsertAll(null, List.of(t2)));
+
+        assertOpOnNode("server-1", "get", x -> recordView.get(null, t1));
+        assertOpOnNode("server-2", "get", x -> recordView.get(null, t2));
+
+        assertOpOnNode("server-1", "getAll", x -> recordView.getAll(null, List.of(t1)));
+        assertOpOnNode("server-2", "getAll", x -> recordView.getAll(null, List.of(t2)));
+
+        assertOpOnNode("server-1", "getAndUpsert", x -> recordView.getAndUpsert(null, t1));
+        assertOpOnNode("server-2", "getAndUpsert", x -> recordView.getAndUpsert(null, t2));
+
+        assertOpOnNode("server-1", "getAndReplace", x -> recordView.getAndReplace(null, t1));
+        assertOpOnNode("server-2", "getAndReplace", x -> recordView.getAndReplace(null, t2));
+
+        assertOpOnNode("server-1", "getAndDelete", x -> recordView.getAndDelete(null, t1));
+        assertOpOnNode("server-2", "getAndDelete", x -> recordView.getAndDelete(null, t2));
+
+        assertOpOnNode("server-1", "replace", x -> recordView.replace(null, t1));
+        assertOpOnNode("server-2", "replace", x -> recordView.replace(null, t2));
+
+        assertOpOnNode("server-1", "replace", x -> recordView.replace(null, t1, t1));
+        assertOpOnNode("server-2", "replace", x -> recordView.replace(null, t2, t2));
+
+        assertOpOnNode("server-1", "delete", x -> recordView.delete(null, t1));
+        assertOpOnNode("server-2", "delete", x -> recordView.delete(null, t2));
+
+        assertOpOnNode("server-1", "deleteExact", x -> recordView.deleteExact(null, t1));
+        assertOpOnNode("server-2", "deleteExact", x -> recordView.deleteExact(null, t2));
+
+        assertOpOnNode("server-1", "deleteAll", x -> recordView.deleteAll(null, List.of(t1)));
+        assertOpOnNode("server-2", "deleteAll", x -> recordView.deleteAll(null, List.of(t2)));
+
+        assertOpOnNode("server-1", "deleteAllExact", x -> recordView.deleteAllExact(null, List.of(t1)));
+        assertOpOnNode("server-2", "deleteAllExact", x -> recordView.deleteAllExact(null, List.of(t2)));
     }
 
     @Test
     public void testAllKeyValueViewOperations() {
-        // TODO IGNITE-17739 Add Partition Awareness to all table APIs
+        KeyValueView<Long, String> kvView = defaultTable().keyValueView(Mapper.of(Long.class), Mapper.of(String.class));
+
+        var k1 = 0L;
+        var k2 = 1L;
+        var v = "v";
+
+        assertOpOnNode("server-1", "insert", x -> kvView.putIfAbsent(null, k1, v));
+        assertOpOnNode("server-2", "insert", x -> kvView.putIfAbsent(null, k2, v));
+
+        assertOpOnNode("server-1", "upsert", x -> kvView.put(null, k1, v));
+        assertOpOnNode("server-2", "upsert", x -> kvView.put(null, k2, v));
+
+        assertOpOnNode("server-1", "upsertAll", x -> kvView.putAll(null, Map.of(k1, v)));
+        assertOpOnNode("server-2", "upsertAll", x -> kvView.putAll(null, Map.of(k2, v)));
+
+        assertOpOnNode("server-1", "get", x -> kvView.get(null, k1));
+        assertOpOnNode("server-2", "get", x -> kvView.get(null, k2));
+
+        assertOpOnNode("server-1", "get", x -> kvView.contains(null, k1));
+        assertOpOnNode("server-2", "get", x -> kvView.contains(null, k2));
+
+        assertOpOnNode("server-1", "getAll", x -> kvView.getAll(null, List.of(k1)));
+        assertOpOnNode("server-2", "getAll", x -> kvView.getAll(null, List.of(k2)));
+
+        assertOpOnNode("server-1", "getAndUpsert", x -> kvView.getAndPut(null, k1, v));
+        assertOpOnNode("server-2", "getAndUpsert", x -> kvView.getAndPut(null, k2, v));
+
+        assertOpOnNode("server-1", "getAndReplace", x -> kvView.getAndReplace(null, k1, v));
+        assertOpOnNode("server-2", "getAndReplace", x -> kvView.getAndReplace(null, k2, v));
+
+        assertOpOnNode("server-1", "getAndDelete", x -> kvView.getAndRemove(null, k1));
+        assertOpOnNode("server-2", "getAndDelete", x -> kvView.getAndRemove(null, k2));
+
+        assertOpOnNode("server-1", "replace", x -> kvView.replace(null, k1, v));
+        assertOpOnNode("server-2", "replace", x -> kvView.replace(null, k2, v));
+
+        assertOpOnNode("server-1", "replace", x -> kvView.replace(null, k1, v, v));
+        assertOpOnNode("server-2", "replace", x -> kvView.replace(null, k2, v, v));
+
+        assertOpOnNode("server-1", "delete", x -> kvView.remove(null, k1));
+        assertOpOnNode("server-2", "delete", x -> kvView.remove(null, k2));
+
+        assertOpOnNode("server-1", "deleteExact", x -> kvView.remove(null, k1, v));
+        assertOpOnNode("server-2", "deleteExact", x -> kvView.remove(null, k2, v));
+
+        assertOpOnNode("server-1", "deleteAll", x -> kvView.removeAll(null, List.of(k1)));
+        assertOpOnNode("server-2", "deleteAll", x -> kvView.removeAll(null, List.of(k2)));
     }
 
     @Test
     public void testAllKeyValueBinaryViewOperations() {
-        // TODO IGNITE-17739 Add Partition Awareness to all table APIs
+        KeyValueView<Tuple, Tuple> kvView = defaultTable().keyValueView();
+
+        Tuple t1 = Tuple.create().set("ID", 0L);
+        Tuple t2 = Tuple.create().set("ID", 1L);
+
+        assertOpOnNode("server-1", "insert", x -> kvView.putIfAbsent(null, t1, t1));
+        assertOpOnNode("server-2", "insert", x -> kvView.putIfAbsent(null, t2, t2));
+
+        assertOpOnNode("server-1", "upsert", x -> kvView.put(null, t1, t1));
+        assertOpOnNode("server-2", "upsert", x -> kvView.put(null, t2, t2));
+
+        assertOpOnNode("server-1", "upsertAll", x -> kvView.putAll(null, Map.of(t1, t1)));
+        assertOpOnNode("server-2", "upsertAll", x -> kvView.putAll(null, Map.of(t2, t2)));
+
+        assertOpOnNode("server-1", "get", x -> kvView.get(null, t1));
+        assertOpOnNode("server-2", "get", x -> kvView.get(null, t2));
+
+        assertOpOnNode("server-1", "get", x -> kvView.contains(null, t1));
+        assertOpOnNode("server-2", "get", x -> kvView.contains(null, t2));
+
+        assertOpOnNode("server-1", "getAll", x -> kvView.getAll(null, List.of(t1)));
+        assertOpOnNode("server-2", "getAll", x -> kvView.getAll(null, List.of(t2)));
+
+        assertOpOnNode("server-1", "getAndUpsert", x -> kvView.getAndPut(null, t1, t1));
+        assertOpOnNode("server-2", "getAndUpsert", x -> kvView.getAndPut(null, t2, t2));
+
+        assertOpOnNode("server-1", "getAndReplace", x -> kvView.getAndReplace(null, t1, t1));
+        assertOpOnNode("server-2", "getAndReplace", x -> kvView.getAndReplace(null, t2, t2));
+
+        assertOpOnNode("server-1", "getAndDelete", x -> kvView.getAndRemove(null, t1));
+        assertOpOnNode("server-2", "getAndDelete", x -> kvView.getAndRemove(null, t2));
+
+        assertOpOnNode("server-1", "replace", x -> kvView.replace(null, t1, t1));
+        assertOpOnNode("server-2", "replace", x -> kvView.replace(null, t2, t2));
+
+        assertOpOnNode("server-1", "replace", x -> kvView.replace(null, t1, t1, t1));
+        assertOpOnNode("server-2", "replace", x -> kvView.replace(null, t2, t2, t2));
+
+        assertOpOnNode("server-1", "delete", x -> kvView.remove(null, t1));
+        assertOpOnNode("server-2", "delete", x -> kvView.remove(null, t2));
+
+        assertOpOnNode("server-1", "deleteExact", x -> kvView.remove(null, t1, t1));
+        assertOpOnNode("server-2", "deleteExact", x -> kvView.remove(null, t2, t2));
+
+        assertOpOnNode("server-1", "deleteAll", x -> kvView.removeAll(null, List.of(t1)));
+        assertOpOnNode("server-2", "deleteAll", x -> kvView.removeAll(null, List.of(t2)));
     }
 
     private void assertOpOnNode(String expectedNode, String expectedOp, Consumer<Void> op) {
@@ -194,8 +417,8 @@ public class PartitionAwarenessTest extends AbstractClientTest {
 
         op.accept(null);
 
-        assertEquals(expectedNode, lastOpServerName);
         assertEquals(expectedOp, lastOp);
+        assertEquals(expectedNode, lastOpServerName, "Operation " + expectedOp + " was not executed on expected node");
     }
 
     private Table defaultTable() {
index 8750bbbfeb6510a4f2d58a1fbdfa30f185fff1e3..c2c33a67dacffd82fce7962a1e04dd4443bb9983 100644 (file)
@@ -179,7 +179,7 @@ public class RetryPolicyTest {
 
     @Test
     public void testRetryReadPolicyDoesNotRetryWriteOperations() throws Exception {
-        initServer(reqId -> reqId % 5 == 0);
+        initServer(reqId -> reqId % 6 == 0);
 
         try (var client = getClient(new RetryReadPolicy())) {
             RecordView<Tuple> recView = client.tables().table("t").recordView();
index 3c5f46226a4ba55346df5e20f5889dcac7f4555c..68a006326bb2eebf65963731c44e83457279b43b 100644 (file)
@@ -100,8 +100,6 @@ public class FakeInternalTable implements InternalTable {
     @Override
     public CompletableFuture<Collection<BinaryRow>> getAll(Collection<BinaryRowEx> keyRows,
             @Nullable InternalTransaction tx) {
-        onDataAccess("getAll", keyRows);
-
         var res = new ArrayList<BinaryRow>();
 
         for (var key : keyRows) {
@@ -112,6 +110,7 @@ public class FakeInternalTable implements InternalTable {
             }
         }
 
+        onDataAccess("getAll", keyRows);
         return CompletableFuture.completedFuture(res);
     }
 
@@ -128,12 +127,11 @@ public class FakeInternalTable implements InternalTable {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
-        onDataAccess("upsertAll", rows);
-
         for (var row : rows) {
             upsert(row, tx);
         }
 
+        onDataAccess("upsertAll", rows);
         return CompletableFuture.completedFuture(null);
     }
 
@@ -141,36 +139,33 @@ public class FakeInternalTable implements InternalTable {
     @Override
     public CompletableFuture<BinaryRow> getAndUpsert(BinaryRowEx row,
             @Nullable InternalTransaction tx) {
-        onDataAccess("getAndUpsert", row);
-
         var res = get(row, tx);
 
         upsert(row, tx);
 
+        onDataAccess("getAndUpsert", row);
         return CompletableFuture.completedFuture(res.getNow(null));
     }
 
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Boolean> insert(BinaryRowEx row, @Nullable InternalTransaction tx) {
-        onDataAccess("insert", row);
-
         var old = get(row, tx).getNow(null);
+        boolean res = false;
 
         if (old == null) {
             upsert(row, tx);
 
-            return CompletableFuture.completedFuture(true);
+            res = true;
         }
 
-        return CompletableFuture.completedFuture(false);
+        onDataAccess("insert", row);
+        return CompletableFuture.completedFuture(res);
     }
 
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Collection<BinaryRow>> insertAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
-        onDataAccess("insertAll", rows);
-
         var skipped = new ArrayList<BinaryRow>();
 
         for (var row : rows) {
@@ -179,97 +174,100 @@ public class FakeInternalTable implements InternalTable {
             }
         }
 
+        onDataAccess("insertAll", rows);
         return CompletableFuture.completedFuture(skipped);
     }
 
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Boolean> replace(BinaryRowEx row, @Nullable InternalTransaction tx) {
-        onDataAccess("replace", row);
-
         var old = get(row, tx).getNow(null);
 
         if (old == null) {
+            onDataAccess("replace", row);
             return CompletableFuture.completedFuture(false);
         }
 
-        return upsert(row, tx).thenApply(f -> true);
+        CompletableFuture<Void> upsert = upsert(row, tx);
+
+        onDataAccess("replace", row);
+        return upsert.thenApply(f -> true);
     }
 
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Boolean> replace(BinaryRowEx oldRow, BinaryRowEx newRow, @Nullable InternalTransaction tx) {
-        onDataAccess("replace", oldRow);
-
         var old = get(oldRow, tx).getNow(null);
 
         if (old == null || !old.valueSlice().equals(oldRow.valueSlice())) {
+            onDataAccess("replace", oldRow);
             return CompletableFuture.completedFuture(false);
         }
 
-        return upsert(newRow, tx).thenApply(f -> true);
+        CompletableFuture<Void> upsert = upsert(newRow, tx);
+
+        onDataAccess("replace", oldRow);
+        return upsert.thenApply(f -> true);
     }
 
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<BinaryRow> getAndReplace(BinaryRowEx row,
             @Nullable InternalTransaction tx) {
-        onDataAccess("getAndReplace", row);
-
         var old = get(row, tx);
 
-        return replace(row, tx).thenCompose(f -> old);
+        CompletableFuture<Boolean> replace = replace(row, tx);
+
+        onDataAccess("getAndReplace", row);
+        return replace.thenCompose(f -> old);
     }
 
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Boolean> delete(BinaryRowEx keyRow, @Nullable InternalTransaction tx) {
-        onDataAccess("delete", keyRow);
-
         var old = get(keyRow, tx).getNow(null);
 
         if (old != null) {
             data.remove(keyRow.keySlice());
         }
 
+        onDataAccess("delete", keyRow);
         return CompletableFuture.completedFuture(old != null);
     }
 
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Boolean> deleteExact(BinaryRowEx oldRow, @Nullable InternalTransaction tx) {
-        onDataAccess("deleteExact", oldRow);
+        var res = false;
 
         var old = get(oldRow, tx).getNow(null);
 
         if (old != null && old.valueSlice().equals(oldRow.valueSlice())) {
             data.remove(oldRow.keySlice());
-            return CompletableFuture.completedFuture(true);
+            res = true;
         }
 
-        return CompletableFuture.completedFuture(false);
+        onDataAccess("deleteExact", oldRow);
+        return CompletableFuture.completedFuture(res);
     }
 
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<BinaryRow> getAndDelete(BinaryRowEx row,
             @Nullable InternalTransaction tx) {
-        onDataAccess("getAndDelete", row);
-
         var old = get(row, tx).getNow(null);
 
         if (old != null) {
             data.remove(row.keySlice());
         }
 
+        onDataAccess("getAndDelete", row);
         return CompletableFuture.completedFuture(old);
     }
 
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Collection<BinaryRow>> deleteAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
-        onDataAccess("deleteAll", rows);
-
         var skipped = new ArrayList<BinaryRow>();
 
         for (var row : rows) {
@@ -278,14 +276,13 @@ public class FakeInternalTable implements InternalTable {
             }
         }
 
+        onDataAccess("deleteAll", rows);
         return CompletableFuture.completedFuture(skipped);
     }
 
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Collection<BinaryRow>> deleteAllExact(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) {
-        onDataAccess("deleteAllExact", rows);
-
         var skipped = new ArrayList<BinaryRow>();
 
         for (var row : rows) {
@@ -294,6 +291,7 @@ public class FakeInternalTable implements InternalTable {
             }
         }
 
+        onDataAccess("deleteAllExact", rows);
         return CompletableFuture.completedFuture(skipped);
     }