Implement `IgniteSet` in Java thin client.
* Identify sets by `name` and `cacheId`. Name alone is not enough, because target cache is determined by atomicity, cache mode, backups count, group name, etc. Passing `cacheId` is much simpler, and helps with affinity.
* Differences from thick API:
* Iterator is `AutoCloseable`. However, users only need to close it when it did not reach the end.
* Compute APIs are out of scope (`affinityCall`, `affinityRun`).
* `ClientIgniteSet#pageSize` controls batching.
* `ClientIgniteSet#serverKeepBinary` controls server-side deserialization behavior for user classes.
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import java.util.Iterator;
+
+/**
+ * Auto closeable iterator.
+ *
+ * @param <T> Element type.
+ */
+public interface ClientAutoCloseableIterator<T> extends Iterator<T>, AutoCloseable {
+ // No-op.
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ * Configuration for Ignite collections.
+ */
+public class ClientCollectionConfiguration {
+ /** Cache atomicity mode. */
+ private CacheAtomicityMode atomicityMode = ATOMIC;
+
+ /** Cache mode. */
+ private CacheMode cacheMode = PARTITIONED;
+
+ /** Number of backups. */
+ private int backups;
+
+ /** Colocated flag. */
+ private boolean colocated;
+
+ /** Group name. */
+ private String grpName;
+
+ /**
+ * @return {@code True} if all items within the same collection will be collocated on the same node.
+ */
+ public boolean isColocated() {
+ return colocated;
+ }
+
+ /**
+ * @param colocated If {@code true} then all items within the same collection will be collocated on the same node.
+ * Otherwise elements of the same set maybe be cached on different nodes. This parameter works only
+ * collections stored in {@link CacheMode#PARTITIONED} cache.
+ *
+ * @return {@code this} for chaining.
+ */
+ public ClientCollectionConfiguration setColocated(boolean colocated) {
+ this.colocated = colocated;
+
+ return this;
+ }
+
+ /**
+ * @return Cache atomicity mode.
+ */
+ public CacheAtomicityMode getAtomicityMode() {
+ return atomicityMode;
+ }
+
+ /**
+ * @param atomicityMode Cache atomicity mode.
+ * @return {@code this} for chaining.
+ */
+ public ClientCollectionConfiguration setAtomicityMode(CacheAtomicityMode atomicityMode) {
+ this.atomicityMode = atomicityMode;
+
+ return this;
+ }
+
+ /**
+ * @return Cache mode.
+ */
+ public CacheMode getCacheMode() {
+ return cacheMode;
+ }
+
+ /**
+ * @param cacheMode Cache mode.
+ * @return {@code this} for chaining.
+ */
+ public ClientCollectionConfiguration setCacheMode(CacheMode cacheMode) {
+ this.cacheMode = cacheMode;
+
+ return this;
+ }
+
+ /**
+ * @return Number of backups.
+ */
+ public int getBackups() {
+ return backups;
+ }
+
+ /**
+ * @param backups Cache number of backups.
+ * @return {@code this} for chaining.
+ */
+ public ClientCollectionConfiguration setBackups(int backups) {
+ this.backups = backups;
+
+ return this;
+ }
+
+ /**
+ * @return Group name.
+ */
+ public String getGroupName() {
+ return grpName;
+ }
+
+ /**
+ * @param grpName Group name.
+ * @return {@code this} for chaining.
+ */
+ public ClientCollectionConfiguration setGroupName(String grpName) {
+ this.grpName = grpName;
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ClientCollectionConfiguration.class, this);
+ }
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * Distributed Set.
+ *
+ * <h1 class="header">Overview</h1>
+ * Cache set implements {@link Set} interface and provides all methods from collections.
+ *
+ * <h1 class="header">Colocated vs Non-colocated</h1>
+ * Set items can be placed on one node or distributed across grid nodes
+ * (governed by {@link ClientCollectionConfiguration#setColocated(boolean)} parameter).
+ * {@code Non-colocated} mode is provided only for partitioned caches.
+ * If {@code colocated} parameter is {@code true}, then all set items
+ * will be colocated on one node, otherwise items will be distributed across all grid nodes.
+ *
+ * @see IgniteClient#set(String, org.apache.ignite.client.ClientCollectionConfiguration)
+ */
+public interface ClientIgniteSet<T> extends Set<T>, Closeable {
+ /** {@inheritDoc} */
+ @Override boolean add(T o);
+
+ /** {@inheritDoc} */
+ @Override boolean addAll(Collection<? extends T> c);
+
+ /** {@inheritDoc} */
+ @Override void clear();
+
+ /** {@inheritDoc} */
+ @Override boolean contains(Object o);
+
+ /** {@inheritDoc} */
+ @Override boolean containsAll(Collection<?> c);
+
+ /** {@inheritDoc} */
+ @Override boolean isEmpty();
+
+ /**
+ * Returns an iterator over the elements in this collection.
+ * <p>
+ * There are no guarantees concerning the order in which the elements are returned.
+ * <p>
+ * Returned iterator is {@link AutoCloseable}: it may hold server-side resources and must be closed.
+ * It will close itself when the last page of data (see {@link #pageSize()}) is fetched from the server.
+ * When {@link Iterator#hasNext()} returns {@code false}, it is guaranteed that the iterator is closed.
+ * In other cases (incomplete iteration) the user must close the iterator.
+ *
+ * @return an Iterator over the elements in this collection.
+ */
+ @Override ClientAutoCloseableIterator<T> iterator();
+
+ /** {@inheritDoc} */
+ @Override boolean remove(Object o);
+
+ /** {@inheritDoc} */
+ @Override boolean removeAll(Collection<?> c);
+
+ /** {@inheritDoc} */
+ @Override boolean retainAll(Collection<?> c);
+
+ /** {@inheritDoc} */
+ @Override int size();
+
+ /** {@inheritDoc} */
+ @Override Object[] toArray();
+
+ /** {@inheritDoc} */
+ @Override <T1> T1[] toArray(T1[] a);
+
+ /**
+ * Removes this set.
+ */
+ @Override public void close();
+
+ /**
+ * Gets set name.
+ *
+ * @return Set name.
+ */
+ public String name();
+
+ /**
+ * Gets a value indicating whether all items of this set are stored on a single node.
+ *
+ * @return {@code True} if all items of this set are stored on a single node, {@code false} otherwise.
+ */
+ public boolean colocated();
+
+ /**
+ * Gets a value indicating whether this set has been removed ({@link #close()} was called).
+ *
+ * @return {@code True} if set was removed from cache, {@code false} otherwise.
+ */
+ public boolean removed();
+
+ /**
+ * Sets a value indicating whether user objects should be kept in binary form on the server, or deserialized.
+ * <p>
+ * Default is {@code true}: does not require classes on server, interoperable with other thin clients, performs better.
+ * Suitable for most use cases.
+ * <p>
+ * Set to {@code false} if there is a requirement to use deserialized objects in "thick" API ({@link org.apache.ignite.IgniteSet})
+ * together with thin client API, like in this scenario:
+ *
+ * <pre> {@code
+ * ClientIgniteSet<UserObj> clientSet = client.set("my-set", new ClientCollectionConfiguration());
+ * clientSet.serverKeepBinary(false);
+ *
+ * IgniteSet<UserObj> serverSet = server.set(clientSet.name(), null);
+ *
+ * clientSet.add(new UserObj(1, "client"));
+ * assert serverSet.contains(new UserObj(1, "client"));
+ * }</pre>
+ *
+ * @param keepBinary Whether to keep objects in binary form on the server.
+ * @return This set instance (for chaining).
+ */
+ public ClientIgniteSet<T> serverKeepBinary(boolean keepBinary);
+
+ /**
+ * Gets a value indicating whether user objects should be kept in binary form on the server, or deserialized.
+ * <p>
+ * Default is {@code true}: does not require classes on server, interoperable with other thin clients, performs better.
+ * Suitable for most use cases.
+ * <p>
+ * Set to {@code false} if there is a requirement to use deserialized objects in "thick" API ({@link org.apache.ignite.IgniteSet})
+ * together with thin client API, like in this scenario:
+ *
+ * <pre> {@code
+ * ClientIgniteSet<UserObj> clientSet = client.set("my-set", new ClientCollectionConfiguration());
+ * clientSet.serverKeepBinary(false);
+ *
+ * IgniteSet<UserObj> serverSet = server.set(clientSet.name(), null);
+ *
+ * clientSet.add(new UserObj(1, "client"));
+ * assert serverSet.contains(new UserObj(1, "client"));
+ * }</pre>
+ *
+ * @return {@code true} when user objects will be kept in binary form on the server, {@code false} otherwise.
+ */
+ public boolean serverKeepBinary();
+
+ /**
+ * Sets the page size to be used for batched network data retrieval in {@link #iterator()} and {@link #toArray()}.
+ *
+ * @param pageSize Page size.
+ * @return This set instance (for chaining).
+ */
+ public ClientIgniteSet<T> pageSize(int pageSize);
+
+ /**
+ * Gets the page size to be used for batched network data retrieval in {@link #iterator()} and {@link #toArray()}.
+ *
+ * @return Page size.
+ */
+ public int pageSize();
+}
package org.apache.ignite.client;
+import java.util.Collection;
import java.util.Set;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.Query;
/**
* AtomicLong.compareAndSet ({@link ClientAtomicLong#compareAndSet(long, long)}).
*/
- ATOMIC_LONG_VALUE_COMPARE_AND_SET
+ ATOMIC_LONG_VALUE_COMPARE_AND_SET,
+
+ /**
+ * Create an IgniteSet ({@link IgniteClient#set(String, ClientCollectionConfiguration)}).
+ */
+ SET_GET_OR_CREATE,
+
+ /**
+ * Remove an IgniteSet ({@link ClientIgniteSet#close()}).
+ */
+ SET_REMOVE,
+
+ /**
+ * Check if IgniteSet exists ({@link ClientIgniteSet#removed()}).
+ */
+ SET_EXISTS,
+
+ /**
+ * IgniteSet.add ({@link ClientIgniteSet#add(Object)}).
+ */
+ SET_VALUE_ADD,
+
+ /**
+ * IgniteSet.addAll ({@link ClientIgniteSet#addAll(Collection)}).
+ */
+ SET_VALUE_ADD_ALL,
+
+ /**
+ * IgniteSet.remove ({@link ClientIgniteSet#remove}).
+ */
+ SET_VALUE_REMOVE,
+
+ /**
+ * IgniteSet.removeAll ({@link ClientIgniteSet#removeAll}).
+ */
+ SET_VALUE_REMOVE_ALL,
+
+ /**
+ * IgniteSet.contains ({@link ClientIgniteSet#contains(Object)}).
+ */
+ SET_VALUE_CONTAINS,
+
+ /**
+ * IgniteSet.containsAll ({@link ClientIgniteSet#containsAll}).
+ */
+ SET_VALUE_CONTAINS_ALL,
+
+ /**
+ * IgniteSet.retainAll ({@link ClientIgniteSet#retainAll}).
+ */
+ SET_VALUE_RETAIN_ALL,
+
+ /**
+ * IgniteSet.size ({@link ClientIgniteSet#size()}).
+ */
+ SET_SIZE,
+
+ /**
+ * IgniteSet.clear ({@link ClientIgniteSet#clear()}).
+ */
+ SET_CLEAR,
+
+ /**
+ * IgniteSet.iterator ({@link ClientIgniteSet#iterator()}, {@link ClientIgniteSet#toArray()}).
+ */
+ SET_ITERATOR
}
import java.util.Collection;
import java.util.List;
import org.apache.ignite.IgniteBinary;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.jetbrains.annotations.Nullable;
/**
* Ignite thin client.
*/
public ClientAtomicLong atomicLong(String name, ClientAtomicConfiguration cfg, long initVal, boolean create);
+ /**
+ * Gets a distributed set from cache. Creates one if it has not been created yet and {@code cfg} is not {@code null}.
+ *
+ * @param name Set name.
+ * @param cfg Set configuration if new set should be created.
+ * @param <T> Type of the elements in set.
+ * @return Set with given properties.
+ * @throws IgniteException If set could not be fetched or created.
+ */
+ public <T> ClientIgniteSet<T> set(String name, @Nullable ClientCollectionConfiguration cfg);
+
/**
* Closes this client's open connections and relinquishes all underlying resources.
*/
private IgnitePredicate<ClusterNode> nodeFilter;
/** Number of backups. */
- private int backups = 0;
+ private int backups;
/** Off-heap memory size. */
+ @Deprecated
private long offHeapMaxMem = -1;
/** Collocated flag. */
/**
* @return Off-heap memory size.
+ * @deprecated No longer used.
*/
+ @Deprecated
public long getOffHeapMaxMemory() {
return offHeapMaxMem;
}
/**
* @param offHeapMaxMemory Off-heap memory size.
* @return {@code this} for chaining.
+ * @deprecated No longer used.
*/
+ @Deprecated
public CollectionConfiguration setOffHeapMaxMemory(long offHeapMaxMemory) {
- this.offHeapMaxMem = offHeapMaxMemory;
+ offHeapMaxMem = offHeapMaxMemory;
return this;
}
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSet;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.cluster.IgniteClusterEx;
* @param rebalanceEnabled rebalance enabled flag.
*/
public void rebalanceEnabled(boolean rebalanceEnabled);
+
+ /**
+ * Gets a set from cache by known cache id. Does not create new sets.
+ *
+ * @param name Set name.
+ * @param cacheId Cache id.
+ * @param collocated Colocated mode flag.
+ * @param separated Separated cache flag.
+ * @param <T> Type of the elements in set.
+ * @return Set with given properties.
+ * @throws IgniteException If set could not be fetched or created.
+ */
+ public <T> IgniteSet<T> set(String name, int cacheId, boolean collocated, boolean separated) throws IgniteException;
}
}
}
+ /** {@inheritDoc} */
+ @Override public <T> IgniteSet<T> set(String name, int cacheId, boolean collocated, boolean separated) throws IgniteException {
+ guard();
+
+ try {
+ checkClusterState();
+
+ return ctx.dataStructures().set(name, cacheId, collocated, separated);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ unguard();
+ }
+ }
+
/**
* The {@code ctx.gateway().readLock()} is used underneath.
*/
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.client.ClientAutoCloseableIterator;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.ClientIgniteSet;
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.processors.platform.client.ClientStatus;
+import org.apache.ignite.internal.processors.platform.client.IgniteClientException;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.A;
+
+/**
+ * Client Ignite Set.
+ */
+class ClientIgniteSetImpl<T> implements ClientIgniteSet<T> {
+ /** */
+ private final String name;
+
+ /** */
+ private final ReliableChannel ch;
+
+ /** */
+ private final ClientUtils serDes;
+
+ /** */
+ private final boolean colocated;
+
+ /** */
+ private final int cacheId;
+
+ /** */
+ private volatile boolean serverKeepBinary = true;
+
+ /** */
+ private volatile int pageSize = 1024;
+
+ /**
+ * Constructor.
+ * @param ch Channel.
+ * @param serDes Utils..
+ * @param name Name.
+ * @param colocated Colocated flag.
+ * @param cacheId Cache id.
+ */
+ public ClientIgniteSetImpl(
+ ReliableChannel ch,
+ ClientUtils serDes,
+ String name,
+ boolean colocated,
+ int cacheId) {
+ assert ch != null;
+ assert serDes != null;
+ assert name != null;
+
+ this.ch = ch;
+ this.serDes = serDes;
+ this.name = name;
+ this.colocated = colocated;
+ this.cacheId = cacheId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean add(T o) {
+ A.notNull(o, "o");
+
+ return singleKeyOp(ClientOperation.OP_SET_VALUE_ADD, o);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addAll(Collection<? extends T> c) {
+ A.notNull(c, "c");
+
+ return multiKeyOp(ClientOperation.OP_SET_VALUE_ADD_ALL, c);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void clear() {
+ op(ClientOperation.OP_SET_CLEAR, null, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean contains(Object o) {
+ A.notNull(o, "o");
+
+ return singleKeyOp(ClientOperation.OP_SET_VALUE_CONTAINS, o);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean containsAll(Collection<?> c) {
+ A.notNull(c, "c");
+
+ return multiKeyOp(ClientOperation.OP_SET_VALUE_CONTAINS_ALL, c);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isEmpty() {
+ return size() == 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientAutoCloseableIterator<T> iterator() {
+ Consumer<PayloadOutputChannel> payloadWriter = out -> {
+ writeIdentity(out);
+ out.out().writeInt(pageSize);
+ };
+
+ Function<PayloadInputChannel, ClientAutoCloseableIterator<T>> payloadReader = in -> {
+ List<T> page = readPage(in);
+ boolean hasNext = in.in().readBoolean();
+ Long resourceId = hasNext ? in.in().readLong() : null;
+ ClientChannel resourceCh = hasNext ? in.clientChannel() : null;
+
+ return new PagedIterator(resourceCh, resourceId, page);
+ };
+
+ if (colocated) {
+ Object affinityKey = name.hashCode();
+
+ return ch.affinityService(cacheId, affinityKey, ClientOperation.OP_SET_ITERATOR_START, payloadWriter, payloadReader);
+ }
+
+ return ch.service(ClientOperation.OP_SET_ITERATOR_START, payloadWriter, payloadReader);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean remove(Object o) {
+ A.notNull(o, "o");
+
+ return singleKeyOp(ClientOperation.OP_SET_VALUE_REMOVE, o);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean removeAll(Collection<?> c) {
+ A.notNull(c, "c");
+
+ return multiKeyOp(ClientOperation.OP_SET_VALUE_REMOVE_ALL, c);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean retainAll(Collection<?> c) {
+ A.notNull(c, "c");
+
+ if (c.isEmpty()) {
+ // Special case: remove all.
+ // Not the same as clear, because we need the boolean result.
+ return ch.service(ClientOperation.OP_SET_VALUE_RETAIN_ALL, out -> {
+ try (BinaryRawWriterEx w = serDes.createBinaryWriter(out.out())) {
+ writeIdentity(w);
+ w.writeBoolean(serverKeepBinary);
+ w.writeInt(0); // Size.
+ }
+ }, r -> r.in().readBoolean());
+ }
+
+ return multiKeyOp(ClientOperation.OP_SET_VALUE_RETAIN_ALL, c);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return op(ClientOperation.OP_SET_SIZE, null, r -> r.in().readInt());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object[] toArray() {
+ return toArray(X.EMPTY_OBJECT_ARRAY);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T1> T1[] toArray(T1[] a) {
+ try (ClientAutoCloseableIterator<T> it = iterator()) {
+ ArrayList<T1> res = new ArrayList<>();
+
+ while (it.hasNext())
+ res.add((T1)it.next());
+
+ return res.toArray(a);
+ }
+ catch (Exception e) {
+ throw new IgniteClientException(ClientStatus.FAILED, e.getMessage(), e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ op(ClientOperation.OP_SET_CLOSE, null, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean colocated() {
+ return colocated;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean removed() {
+ return !op(ClientOperation.OP_SET_EXISTS, null, r -> r.in().readBoolean());
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientIgniteSet<T> serverKeepBinary(boolean keepBinary) {
+ serverKeepBinary = keepBinary;
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean serverKeepBinary() {
+ return serverKeepBinary;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientIgniteSet<T> pageSize(int pageSize) {
+ if (pageSize <= 0)
+ throw new IllegalArgumentException("Page size must be greater than 0.");
+
+ this.pageSize = pageSize;
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int pageSize() {
+ return pageSize;
+ }
+
+ /**
+ * Performs a single key operation.
+ *
+ * @param op Op code.
+ * @param key Key.
+ * @return Result.
+ */
+ private Boolean singleKeyOp(ClientOperation op, Object key) {
+ Object affKey = affinityKey(key);
+
+ return ch.affinityService(cacheId, affKey, op, out -> {
+ try (BinaryRawWriterEx w = serDes.createBinaryWriter(out.out())) {
+ writeIdentity(w);
+
+ w.writeBoolean(serverKeepBinary);
+ w.writeObject(key);
+ }
+ }, r -> r.in().readBoolean());
+ }
+
+ /**
+ * Performs a multi key operation.
+ *
+ * @param op Op code.
+ * @param keys Keys.
+ * @return Result.
+ */
+ @SuppressWarnings("rawtypes")
+ private Boolean multiKeyOp(ClientOperation op, Collection keys) {
+ if (keys.isEmpty())
+ return false;
+
+ Iterator iter = keys.iterator();
+ Object firstKey = iter.next();
+
+ // Use the first key as affinity key as a simple optimization.
+ // Let the server map other keys to nodes, while still using no more than N network requests.
+ Object affKey = affinityKey(firstKey);
+
+ return ch.affinityService(cacheId, affKey, op, out -> {
+ try (BinaryRawWriterEx w = serDes.createBinaryWriter(out.out())) {
+ writeIdentity(w);
+
+ w.writeBoolean(serverKeepBinary);
+ w.writeInt(keys.size());
+
+ w.writeObject(firstKey);
+
+ while (iter.hasNext()) {
+ w.writeObject(iter.next());
+ }
+ }
+ }, r -> r.in().readBoolean());
+ }
+
+ /**
+ * Performs an operation.
+ *
+ * @param op Op code.
+ * @param writer Writer.
+ * @param reader Reader.
+ * @param <TR> Result type.
+ * @return Result.
+ */
+ private <TR> TR op(ClientOperation op, Consumer<BinaryRawWriterEx> writer, Function<PayloadInputChannel, TR> reader) {
+ return ch.service(op, out -> {
+ try (BinaryRawWriterEx w = serDes.createBinaryWriter(out.out())) {
+ writeIdentity(w);
+
+ if (writer != null)
+ writer.accept(w);
+ }
+ }, reader);
+ }
+
+ /**
+ * Writes identity.
+ *
+ * @param out Output channel.
+ */
+ private void writeIdentity(PayloadOutputChannel out) {
+ try (BinaryRawWriterEx w = serDes.createBinaryWriter(out.out())) {
+ writeIdentity(w);
+ }
+ }
+
+ /**
+ * Writes identity.
+ *
+ * @param w Writer.
+ */
+ private void writeIdentity(BinaryRawWriterEx w) {
+ // IgniteSet is uniquely identified by name, cacheId, and colocated flag.
+ // Just name and groupName are not enough, because target cache name depends on multiple config properties
+ // (atomicity mode, backups, etc).
+ // So cacheId replaces group name and all those properties. It also simplifies affinity calculation.
+ w.writeString(name);
+ w.writeInt(cacheId);
+ w.writeBoolean(colocated);
+ }
+
+ /**
+ * Gets affinity key for the user key.
+ *
+ * @param key Key.
+ * @return Affinity key.
+ */
+ private Object affinityKey(Object key) {
+ // CollocatedSetItemKey#setNameHash is AffinityKeyMapped.
+ if (colocated)
+ return name.hashCode();
+
+ // Only separated mode is supported by the client partition awareness,
+ // because older cluster nodes simply don't support this client feature.
+ // Server wraps user object into GridCacheSetItemKey, but setId is always null in separated mode,
+ // so the user object itself ends up as affinity key.
+ return key;
+ }
+
+ /**
+ * Reads iterator page.
+ *
+ * @param in Input channel.
+ * @return Page.
+ */
+ private List<T> readPage(PayloadInputChannel in) {
+ try (BinaryReaderExImpl r = serDes.createBinaryReader(in.in())) {
+ int size = r.readInt();
+ List<T> res = new ArrayList<>(size);
+
+ for (int i = 0; i < size; i++)
+ res.add((T)r.readObject());
+
+ return res;
+ }
+ catch (IOException e) {
+ throw new ClientException(e);
+ }
+ }
+
+ /**
+ * Paged iterator.
+ */
+ private class PagedIterator implements ClientAutoCloseableIterator<T> {
+ /** */
+ private final ClientChannel resourceCh;
+
+ /** */
+ private Long resourceId;
+
+ /** */
+ private List<T> page;
+
+ /** */
+ private int pos;
+
+ /**
+ * Constructor.
+ *
+ * @param resourceCh Associated channel.
+ * @param resourceId Resource id.
+ * @param page First page.
+ */
+ public PagedIterator(ClientChannel resourceCh, Long resourceId, List<T> page) {
+ assert page != null;
+ assert (resourceCh == null) == (resourceId == null);
+
+ this.resourceCh = resourceCh;
+ this.resourceId = resourceId;
+ this.page = page;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return pos < page.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public T next() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ T next = page.get(pos++);
+
+ if (pos >= page.size() && resourceId != null)
+ fetchNextPage();
+
+ return next;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws Exception {
+ Long id = resourceId;
+
+ if (id == null)
+ return;
+
+ ch.service(ClientOperation.RESOURCE_CLOSE, w -> w.out().writeLong(id), null);
+
+ resourceId = null;
+ pos = Integer.MAX_VALUE;
+ }
+
+ /**
+ * Fetches next page from the server.
+ */
+ private void fetchNextPage() {
+ page = resourceCh.service(
+ ClientOperation.OP_SET_ITERATOR_GET_PAGE,
+ out -> {
+ out.out().writeLong(resourceId);
+ out.out().writeInt(pageSize);
+ },
+ in -> {
+ List<T> res = readPage(in);
+ boolean hasNext = in.in().readBoolean();
+
+ if (!hasNext)
+ resourceId = null;
+
+ return res;
+ });
+
+ pos = 0;
+ }
+ }
+}
ATOMIC_LONG_VALUE_COMPARE_AND_SET(9006),
/** AtomicLong.compareAndSetAndGet. */
- ATOMIC_LONG_VALUE_COMPARE_AND_SET_AND_GET(9007);
+ ATOMIC_LONG_VALUE_COMPARE_AND_SET_AND_GET(9007),
+
+ /** Create an IgniteSet. */
+ OP_SET_GET_OR_CREATE(9010),
+
+ /** Remove an IgniteSet. */
+ OP_SET_CLOSE(9011),
+
+ /** Check if IgniteSet exists. */
+ OP_SET_EXISTS(9012),
+
+ /** IgniteSet.add. */
+ OP_SET_VALUE_ADD(9013),
+
+ /** IgniteSet.addAll. */
+ OP_SET_VALUE_ADD_ALL(9014),
+
+ /** IgniteSet.remove. */
+ OP_SET_VALUE_REMOVE(9015),
+
+ /** IgniteSet.removeAll. */
+ OP_SET_VALUE_REMOVE_ALL(9016),
+
+ /** IgniteSet.contains. */
+ OP_SET_VALUE_CONTAINS(9017),
+
+ /** IgniteSet.containsAll. */
+ OP_SET_VALUE_CONTAINS_ALL(9018),
+
+ /** IgniteSet.retainAll. */
+ OP_SET_VALUE_RETAIN_ALL(9019),
+
+ /** IgniteSet.size. */
+ OP_SET_SIZE(9020),
+
+ /** IgniteSet.clear. */
+ OP_SET_CLEAR(9021),
+
+ /** IgniteSet.iterator. */
+ OP_SET_ITERATOR_START(9022),
+
+ /** IgniteSet.iterator page. */
+ OP_SET_ITERATOR_GET_PAGE(9023);
/** Code. */
private final int code;
case ATOMIC_LONG_VALUE_COMPARE_AND_SET_AND_GET:
return ClientOperationType.ATOMIC_LONG_VALUE_COMPARE_AND_SET;
+ case OP_SET_GET_OR_CREATE:
+ return ClientOperationType.SET_GET_OR_CREATE;
+
+ case OP_SET_CLOSE:
+ return ClientOperationType.SET_REMOVE;
+
+ case OP_SET_EXISTS:
+ return ClientOperationType.SET_EXISTS;
+
+ case OP_SET_VALUE_ADD:
+ return ClientOperationType.SET_VALUE_ADD;
+
+ case OP_SET_VALUE_ADD_ALL:
+ return ClientOperationType.SET_VALUE_ADD_ALL;
+
+ case OP_SET_VALUE_REMOVE:
+ return ClientOperationType.SET_VALUE_REMOVE;
+
+ case OP_SET_VALUE_REMOVE_ALL:
+ return ClientOperationType.SET_VALUE_REMOVE_ALL;
+
+ case OP_SET_VALUE_CONTAINS:
+ return ClientOperationType.SET_VALUE_CONTAINS;
+
+ case OP_SET_VALUE_CONTAINS_ALL:
+ return ClientOperationType.SET_VALUE_CONTAINS_ALL;
+
+ case OP_SET_VALUE_RETAIN_ALL:
+ return ClientOperationType.SET_VALUE_RETAIN_ALL;
+
+ case OP_SET_SIZE:
+ return ClientOperationType.SET_SIZE;
+
+ case OP_SET_CLEAR:
+ return ClientOperationType.SET_CLEAR;
+
+ case OP_SET_ITERATOR_START:
+ return ClientOperationType.SET_ITERATOR;
+
default:
return null;
}
import org.apache.ignite.client.ClientCacheConfiguration;
import org.apache.ignite.client.ClientCluster;
import org.apache.ignite.client.ClientClusterGroup;
+import org.apache.ignite.client.ClientCollectionConfiguration;
import org.apache.ignite.client.ClientCompute;
import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.ClientIgniteSet;
import org.apache.ignite.client.ClientServices;
import org.apache.ignite.client.ClientTransactions;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.marshaller.MarshallerContext;
import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.jetbrains.annotations.Nullable;
/**
* Implementation of {@link IgniteClient} over TCP protocol.
if (create) {
ch.service(ClientOperation.ATOMIC_LONG_CREATE, out -> {
- try (BinaryRawWriterEx w = new BinaryWriterExImpl(null, out.out(), null, null)) {
- w.writeString(name);
- w.writeLong(initVal);
-
- if (cfg != null) {
- w.writeBoolean(true);
- w.writeInt(cfg.getAtomicSequenceReserveSize());
- w.writeByte((byte)cfg.getCacheMode().ordinal());
- w.writeInt(cfg.getBackups());
- w.writeString(cfg.getGroupName());
- }
- else
- w.writeBoolean(false);
+ writeString(name, out.out());
+ out.out().writeLong(initVal);
+
+ if (cfg != null) {
+ out.out().writeBoolean(true);
+ out.out().writeInt(cfg.getAtomicSequenceReserveSize());
+ out.out().writeByte((byte)cfg.getCacheMode().ordinal());
+ out.out().writeInt(cfg.getBackups());
+ writeString(cfg.getGroupName(), out.out());
}
+ else
+ out.out().writeBoolean(false);
}, null);
}
return res;
}
+ /** {@inheritDoc} */
+ @Override public <T> ClientIgniteSet<T> set(String name, @Nullable ClientCollectionConfiguration cfg) {
+ GridArgumentCheck.notNull(name, "name");
+
+ return ch.service(ClientOperation.OP_SET_GET_OR_CREATE, out -> {
+ writeString(name, out.out());
+
+ if (cfg != null) {
+ out.out().writeBoolean(true);
+ out.out().writeByte((byte)cfg.getAtomicityMode().ordinal());
+ out.out().writeByte((byte)cfg.getCacheMode().ordinal());
+ out.out().writeInt(cfg.getBackups());
+ writeString(cfg.getGroupName(), out.out());
+ out.out().writeBoolean(cfg.isColocated());
+ }
+ else
+ out.out().writeBoolean(false);
+ }, in -> {
+ if (!in.in().readBoolean())
+ return null;
+
+ boolean colocated = in.in().readBoolean();
+ int cacheId = in.in().readInt();
+
+ return new ClientIgniteSetImpl<>(ch, serDes, name, colocated, cacheId);
+ });
+ }
+
/**
* Initializes new instance of {@link IgniteClient}.
*
}
/** {@inheritDoc} */
+ @SuppressWarnings("rawtypes")
@Override public Class getClass(int typeId, ClassLoader ldr)
throws ClassNotFoundException, IgniteCheckedException {
}, cfg, name, grpName, SET, create, separated);
}
+ /**
+ * Gets a set from cache by known cache id. Does not create new sets.
+ *
+ * @param name Set name.
+ * @param cacheId Cache id.
+ * @param collocated Colocated mode flag.
+ * @param separated Separated cache flag.
+ * @return Set instance.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable public <T> IgniteSet<T> set(String name, int cacheId, boolean collocated, boolean separated)
+ throws IgniteCheckedException {
+ A.notNull(name, "name");
+
+ DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(cacheId);
+
+ if (desc == null)
+ return null;
+
+ IgniteInternalCache<Object, Object> cache = ctx.cache().cache(desc.cacheName());
+
+ if (cache == null)
+ return null;
+
+ return cache.context().dataStructures().set(name, collocated, false, separated);
+ }
+
/**
* @param name Set name.
* @param cctx Set cache context.
/**
* @return Cache context.
*/
- GridCacheContext context() {
+ public GridCacheContext context() {
return ctx;
}
import org.apache.ignite.internal.processors.platform.client.datastructures.ClientAtomicLongValueCompareAndSetRequest;
import org.apache.ignite.internal.processors.platform.client.datastructures.ClientAtomicLongValueGetAndSetRequest;
import org.apache.ignite.internal.processors.platform.client.datastructures.ClientAtomicLongValueGetRequest;
+import org.apache.ignite.internal.processors.platform.client.datastructures.ClientIgniteSetClearRequest;
+import org.apache.ignite.internal.processors.platform.client.datastructures.ClientIgniteSetCloseRequest;
+import org.apache.ignite.internal.processors.platform.client.datastructures.ClientIgniteSetExistsRequest;
+import org.apache.ignite.internal.processors.platform.client.datastructures.ClientIgniteSetGetOrCreateRequest;
+import org.apache.ignite.internal.processors.platform.client.datastructures.ClientIgniteSetIteratorGetPageRequest;
+import org.apache.ignite.internal.processors.platform.client.datastructures.ClientIgniteSetIteratorStartRequest;
+import org.apache.ignite.internal.processors.platform.client.datastructures.ClientIgniteSetSizeRequest;
+import org.apache.ignite.internal.processors.platform.client.datastructures.ClientIgniteSetValueAddAllRequest;
+import org.apache.ignite.internal.processors.platform.client.datastructures.ClientIgniteSetValueAddRequest;
+import org.apache.ignite.internal.processors.platform.client.datastructures.ClientIgniteSetValueContainsAllRequest;
+import org.apache.ignite.internal.processors.platform.client.datastructures.ClientIgniteSetValueContainsRequest;
+import org.apache.ignite.internal.processors.platform.client.datastructures.ClientIgniteSetValueRemoveAllRequest;
+import org.apache.ignite.internal.processors.platform.client.datastructures.ClientIgniteSetValueRemoveRequest;
+import org.apache.ignite.internal.processors.platform.client.datastructures.ClientIgniteSetValueRetainAllRequest;
import org.apache.ignite.internal.processors.platform.client.service.ClientServiceGetDescriptorRequest;
import org.apache.ignite.internal.processors.platform.client.service.ClientServiceGetDescriptorsRequest;
import org.apache.ignite.internal.processors.platform.client.service.ClientServiceInvokeRequest;
/** AtomicLong.compareAndSetAndGet. */
private static final short OP_ATOMIC_LONG_VALUE_COMPARE_AND_SET_AND_GET = 9007;
+ /** Create an IgniteSet. */
+ private static final short OP_SET_GET_OR_CREATE = 9010;
+
+ /** Remove an IgniteSet. */
+ private static final short OP_SET_CLOSE = 9011;
+
+ /** IgniteSet.removed. */
+ private static final short OP_SET_EXISTS = 9012;
+
+ /** IgniteSet.add. */
+ private static final short OP_SET_VALUE_ADD = 9013;
+
+ /** IgniteSet.addAll. */
+ private static final short OP_SET_VALUE_ADD_ALL = 9014;
+
+ /** IgniteSet.remove. */
+ private static final short OP_SET_VALUE_REMOVE = 9015;
+
+ /** IgniteSet.removeAll. */
+ private static final short OP_SET_VALUE_REMOVE_ALL = 9016;
+
+ /** IgniteSet.contains. */
+ private static final short OP_SET_VALUE_CONTAINS = 9017;
+
+ /** IgniteSet.containsAll. */
+ private static final short OP_SET_VALUE_CONTAINS_ALL = 9018;
+
+ /** IgniteSet.retainAll. */
+ private static final short OP_SET_VALUE_RETAIN_ALL = 9019;
+
+ /** IgniteSet.size. */
+ private static final short OP_SET_SIZE = 9020;
+
+ /** IgniteSet.clear. */
+ private static final short OP_SET_CLEAR = 9021;
+
+ /** IgniteSet.iterator. */
+ private static final short OP_SET_ITERATOR_START = 9022;
+
+ /** IgniteSet.iterator page. */
+ private static final short OP_SET_ITERATOR_GET_PAGE = 9023;
+
/** Marshaller. */
private final GridBinaryMarshaller marsh;
case OP_ATOMIC_LONG_VALUE_COMPARE_AND_SET_AND_GET:
return new ClientAtomicLongValueCompareAndSetAndGetRequest(reader);
+
+ case OP_SET_GET_OR_CREATE:
+ return new ClientIgniteSetGetOrCreateRequest(reader);
+
+ case OP_SET_CLOSE:
+ return new ClientIgniteSetCloseRequest(reader);
+
+ case OP_SET_EXISTS:
+ return new ClientIgniteSetExistsRequest(reader);
+
+ case OP_SET_VALUE_ADD:
+ return new ClientIgniteSetValueAddRequest(reader);
+
+ case OP_SET_VALUE_ADD_ALL:
+ return new ClientIgniteSetValueAddAllRequest(reader);
+
+ case OP_SET_VALUE_REMOVE:
+ return new ClientIgniteSetValueRemoveRequest(reader);
+
+ case OP_SET_VALUE_REMOVE_ALL:
+ return new ClientIgniteSetValueRemoveAllRequest(reader);
+
+ case OP_SET_VALUE_CONTAINS:
+ return new ClientIgniteSetValueContainsRequest(reader);
+
+ case OP_SET_VALUE_CONTAINS_ALL:
+ return new ClientIgniteSetValueContainsAllRequest(reader);
+
+ case OP_SET_VALUE_RETAIN_ALL:
+ return new ClientIgniteSetValueRetainAllRequest(reader);
+
+ case OP_SET_SIZE:
+ return new ClientIgniteSetSizeRequest(reader);
+
+ case OP_SET_CLEAR:
+ return new ClientIgniteSetClearRequest(reader);
+
+ case OP_SET_ITERATOR_START:
+ return new ClientIgniteSetIteratorStartRequest(reader);
+
+ case OP_SET_ITERATOR_GET_PAGE:
+ return new ClientIgniteSetIteratorGetPageRequest(reader);
}
return new ClientRawRequest(reader.readLong(), ClientStatus.INVALID_OP_CODE,
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientRequest;
import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.apache.ignite.internal.processors.platform.client.ClientStatus;
/**
* Atomic long value request.
* @return Response for non-existent atomic long.
*/
protected ClientResponse notFoundResponse() {
- return new ClientResponse(requestId(), String.format("AtomicLong with name '%s' does not exist.", name));
+ return new ClientResponse(
+ requestId(),
+ ClientStatus.RESOURCE_DOES_NOT_EXIST,
+ String.format("AtomicLong with name '%s' does not exist.", name));
}
}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.datastructures;
+
+import org.apache.ignite.IgniteSet;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+
+/**
+ * Clears the IgniteSet.
+ */
+public class ClientIgniteSetClearRequest extends ClientIgniteSetRequest {
+ /**
+ * Constructor.
+ *
+ * @param reader Reader.
+ */
+ public ClientIgniteSetClearRequest(BinaryRawReader reader) {
+ super(reader);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected ClientResponse process(IgniteSet<Object> set) {
+ set.clear();
+
+ return super.process(set);
+ }
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.datastructures;
+
+import org.apache.ignite.IgniteSet;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+
+/**
+ * Closes the IgniteSet.
+ */
+public class ClientIgniteSetCloseRequest extends ClientIgniteSetRequest {
+ /**
+ * Constructor.
+ *
+ * @param reader Reader.
+ */
+ public ClientIgniteSetCloseRequest(BinaryRawReader reader) {
+ super(reader);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientResponse process(ClientConnectionContext ctx) {
+ IgniteSet<Object> igniteSet = igniteSet(ctx);
+
+ if (igniteSet != null)
+ igniteSet.close();
+
+ return new ClientResponse(requestId());
+ }
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.datastructures;
+
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.internal.processors.platform.client.ClientBooleanResponse;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+
+/**
+ * Checks if IgniteSet exists.
+ */
+public class ClientIgniteSetExistsRequest extends ClientIgniteSetRequest {
+ /**
+ * Constructor.
+ *
+ * @param reader Reader.
+ */
+ public ClientIgniteSetExistsRequest(BinaryRawReader reader) {
+ super(reader);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientResponse process(ClientConnectionContext ctx) {
+ return new ClientBooleanResponse(requestId(), igniteSet(ctx) != null);
+ }
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.datastructures;
+
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CollectionConfiguration;
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.processors.datastructures.GridCacheSetProxy;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientRequest;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+
+/**
+ * Ignite set get or update request.
+ */
+public class ClientIgniteSetGetOrCreateRequest extends ClientRequest {
+ /** Name. */
+ private final String name;
+
+ /** Config. */
+ private final CollectionConfiguration collectionConfiguration;
+
+ /**
+ * Constructor.
+ *
+ * @param reader Reader.
+ */
+ public ClientIgniteSetGetOrCreateRequest(BinaryRawReader reader) {
+ super(reader);
+
+ name = reader.readString();
+ boolean create = reader.readBoolean();
+
+ collectionConfiguration = create
+ ? new CollectionConfiguration()
+ .setAtomicityMode(CacheAtomicityMode.fromOrdinal(reader.readByte()))
+ .setCacheMode(CacheMode.fromOrdinal(reader.readByte()))
+ .setBackups(reader.readInt())
+ .setGroupName(reader.readString())
+ .setCollocated(reader.readBoolean())
+ : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientResponse process(ClientConnectionContext ctx) {
+ GridCacheSetProxy<Object> set = (GridCacheSetProxy<Object>)ctx
+ .kernalContext()
+ .grid()
+ .set(name, collectionConfiguration);
+
+ if (set == null)
+ return new Response(requestId(), false, null);
+
+ return new Response(requestId(), set.collocated(), set.delegate().context().cacheId());
+ }
+
+ /**
+ * Response.
+ */
+ private static class Response extends ClientResponse {
+ /** */
+ private final boolean collocated;
+
+ /** */
+ private final Integer cacheId;
+
+ /**
+ * Constructor.
+ *
+ * @param reqId Request id.
+ * @param collocated Collocated.
+ * @param cacheId Cache id.
+ */
+ public Response(long reqId, boolean collocated, Integer cacheId) {
+ super(reqId);
+
+ this.collocated = collocated;
+ this.cacheId = cacheId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void encode(ClientConnectionContext ctx, BinaryRawWriterEx writer) {
+ super.encode(ctx, writer);
+
+ if (cacheId != null) {
+ writer.writeBoolean(true);
+ writer.writeBoolean(collocated);
+ writer.writeInt(cacheId);
+ }
+ else
+ writer.writeBoolean(false);
+ }
+ }
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.datastructures;
+
+import java.util.Iterator;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientRequest;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+
+import static org.apache.ignite.internal.processors.platform.client.datastructures.ClientIgniteSetIteratorStartRequest.writePage;
+
+/**
+ * Ignite set iterator next page request.
+ */
+@SuppressWarnings("rawtypes")
+public class ClientIgniteSetIteratorGetPageRequest extends ClientRequest {
+ /** Page size. */
+ private final int pageSize;
+
+ /** Resource id. */
+ private final long resId;
+
+ /**
+ * Constructor.
+ *
+ * @param reader Reader.
+ */
+ public ClientIgniteSetIteratorGetPageRequest(BinaryRawReader reader) {
+ super(reader);
+
+ resId = reader.readLong();
+ pageSize = reader.readInt();
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientResponse process(ClientConnectionContext ctx) {
+ return new Response(requestId(), ctx.resources().get(resId));
+ }
+
+ /**
+ * Response.
+ */
+ private class Response extends ClientResponse {
+ /** Iterator. */
+ private final Iterator iter;
+
+ /**
+ * Constructor.
+ *
+ * @param reqId Request id.
+ * @param iter Iterator.
+ */
+ public Response(long reqId, Iterator iter) {
+ super(reqId);
+
+ this.iter = iter;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void encode(ClientConnectionContext ctx, BinaryRawWriterEx writer) {
+ super.encode(ctx, writer);
+
+ writePage(writer, iter, pageSize);
+
+ if (!iter.hasNext())
+ ctx.resources().release(resId);
+ }
+ }
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.datastructures;
+
+import java.util.Iterator;
+import org.apache.ignite.IgniteSet;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+
+/**
+ * Ignite set iterator start request.
+ */
+@SuppressWarnings("rawtypes")
+public class ClientIgniteSetIteratorStartRequest extends ClientIgniteSetRequest {
+ /** Page size. */
+ private final int pageSize;
+
+ /**
+ * Constructor.
+ *
+ * @param reader Reader.
+ */
+ public ClientIgniteSetIteratorStartRequest(BinaryRawReader reader) {
+ super(reader);
+
+ pageSize = reader.readInt();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected ClientResponse process(IgniteSet<Object> set) {
+ return new Response(requestId(), set.iterator());
+ }
+
+ /**
+ * Writes next page to the writer.
+ *
+ * @param writer Writer.
+ */
+ static void writePage(BinaryRawWriterEx writer, Iterator iter, int pageSize) {
+ int cntPos = writer.reserveInt();
+ int cnt = 0;
+
+ while (cnt < pageSize && iter.hasNext()) {
+ writer.writeObject(iter.next());
+
+ cnt++;
+ }
+
+ writer.writeInt(cntPos, cnt);
+ writer.writeBoolean(iter.hasNext());
+ }
+
+ /**
+ * Response.
+ */
+ private class Response extends ClientResponse {
+ /** Iterator. */
+ private final Iterator iter;
+
+ /**
+ * Constructor.
+ *
+ * @param reqId Request id.
+ * @param iter Iterator.
+ */
+ public Response(long reqId, Iterator iter) {
+ super(reqId);
+
+ this.iter = iter;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void encode(ClientConnectionContext ctx, BinaryRawWriterEx writer) {
+ super.encode(ctx, writer);
+
+ writePage(writer, iter, pageSize);
+
+ if (iter.hasNext()) {
+ long resId = ctx.resources().put(iter);
+ writer.writeLong(resId);
+ }
+ }
+ }
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.datastructures;
+
+import org.apache.ignite.IgniteSet;
+import org.apache.ignite.internal.binary.BinaryRawReaderEx;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+
+/**
+ * Base class for all IgniteSet single-key requests.
+ */
+public abstract class ClientIgniteSetKeyRequest extends ClientIgniteSetRequest {
+ /** Key. */
+ private final Object key;
+
+ /**
+ * Constructor.
+ *
+ * @param reader Reader.
+ */
+ ClientIgniteSetKeyRequest(BinaryRawReaderEx reader) {
+ super(reader);
+
+ // Clients can enable deserialized values on server so that user objects are stored the same way
+ // as if we were using "thick" API.
+ // This is needed when both thick and thin APIs work with the same IgniteSet AND custom user types.
+ boolean keepBinary = reader.readBoolean();
+ key = keepBinary ? reader.readObjectDetached() : reader.readObject();
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientResponse process(ClientConnectionContext ctx) {
+ IgniteSet<Object> igniteSet = igniteSet(ctx);
+
+ if (igniteSet == null)
+ return notFoundResponse();
+
+ return process(igniteSet, key);
+ }
+
+ /**
+ * Processes the key request.
+ *
+ * @param set Ignite set.
+ * @param key Key.
+ * @return Response.
+ */
+ abstract ClientResponse process(IgniteSet<Object> set, Object key);
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.datastructures;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.IgniteSet;
+import org.apache.ignite.internal.binary.BinaryRawReaderEx;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+
+/**
+ * Base class for all IgniteSet single-key requests.
+ */
+public abstract class ClientIgniteSetKeysRequest extends ClientIgniteSetRequest {
+ /** Key. */
+ private final List<Object> keys;
+
+ /**
+ * Constructor.
+ *
+ * @param reader Reader.
+ */
+ ClientIgniteSetKeysRequest(BinaryRawReaderEx reader) {
+ super(reader);
+
+ // Clients can enable deserialized values on server so that user objects are stored the same way
+ // as if we were using "thick" API.
+ // This is needed when both thick and thin APIs work with the same IgniteSet AND custom user types.
+ boolean keepBinary = reader.readBoolean();
+
+ int size = reader.readInt();
+
+ keys = new ArrayList<>(size);
+
+ for (int i = 0; i < size; i++)
+ keys.add(keepBinary ? reader.readObjectDetached() : reader.readObject());
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientResponse process(ClientConnectionContext ctx) {
+ IgniteSet<Object> igniteSet = igniteSet(ctx);
+
+ if (igniteSet == null)
+ return notFoundResponse();
+
+ return process(igniteSet, keys);
+ }
+
+ /**
+ * Processes the key request.
+ *
+ * @param set Ignite set.
+ * @param keys Keys.
+ * @return Response.
+ */
+ abstract ClientResponse process(IgniteSet<Object> set, List<Object> keys);
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.datastructures;
+
+import org.apache.ignite.IgniteSet;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientRequest;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.apache.ignite.internal.processors.platform.client.ClientStatus;
+
+/**
+ * Ignite set get or update request.
+ */
+public class ClientIgniteSetRequest extends ClientRequest {
+ /** */
+ private final String name;
+
+ /** */
+ private final int cacheId;
+
+ /** */
+ private final boolean collocated;
+
+ /**
+ * Constructor.
+ *
+ * @param reader Reader.
+ */
+ public ClientIgniteSetRequest(BinaryRawReader reader) {
+ super(reader);
+
+ name = reader.readString();
+ cacheId = reader.readInt();
+ collocated = reader.readBoolean();
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientResponse process(ClientConnectionContext ctx) {
+ IgniteSet<Object> igniteSet = igniteSet(ctx);
+
+ if (igniteSet == null)
+ return notFoundResponse();
+
+ return process(igniteSet);
+ }
+
+ /**
+ * Processes the request.
+ *
+ * @param set Ignite set.
+ * @return Response.
+ */
+ protected ClientResponse process(IgniteSet<Object> set) {
+ return new ClientResponse(requestId());
+ }
+
+ /**
+ * Gets the name.
+ *
+ * @return Set name.
+ */
+ protected String name() {
+ return name;
+ }
+
+ /**
+ * Gets the IgniteSet.
+ *
+ * @param ctx Context.
+ * @return IgniteSet or null.
+ */
+ protected <T> IgniteSet<T> igniteSet(ClientConnectionContext ctx) {
+ // Thin client only works in separated mode, because non-separated mode was discontinued earlier.
+ return ctx.kernalContext().grid().set(name, cacheId, collocated, true);
+ }
+
+ /**
+ * Gets a response for non-existent set.
+ *
+ * @return Response for non-existent set.
+ */
+ protected ClientResponse notFoundResponse() {
+ return new ClientResponse(
+ requestId(),
+ ClientStatus.RESOURCE_DOES_NOT_EXIST,
+ String.format("IgniteSet with name '%s' does not exist.", name));
+ }
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.datastructures;
+
+import org.apache.ignite.IgniteSet;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.internal.processors.platform.client.ClientIntResponse;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+
+/**
+ * Gets IgniteSet size.
+ */
+public class ClientIgniteSetSizeRequest extends ClientIgniteSetRequest {
+ /**
+ * Constructor.
+ *
+ * @param reader Reader.
+ */
+ public ClientIgniteSetSizeRequest(BinaryRawReader reader) {
+ super(reader);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected ClientResponse process(IgniteSet<Object> set) {
+ return new ClientIntResponse(requestId(), set.size());
+ }
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.datastructures;
+
+import java.util.List;
+import org.apache.ignite.IgniteSet;
+import org.apache.ignite.internal.binary.BinaryRawReaderEx;
+import org.apache.ignite.internal.processors.platform.client.ClientBooleanResponse;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+
+/**
+ * Set addAll request.
+ */
+public class ClientIgniteSetValueAddAllRequest extends ClientIgniteSetKeysRequest {
+ /**
+ * Constructor.
+ *
+ * @param reader Reader.
+ */
+ public ClientIgniteSetValueAddAllRequest(BinaryRawReaderEx reader) {
+ super(reader);
+ }
+
+ /** {@inheritDoc} */
+ @Override ClientResponse process(IgniteSet<Object> set, List<Object> keys) {
+ return new ClientBooleanResponse(requestId(), set.addAll(keys));
+ }
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.datastructures;
+
+import org.apache.ignite.IgniteSet;
+import org.apache.ignite.internal.binary.BinaryRawReaderEx;
+import org.apache.ignite.internal.processors.platform.client.ClientBooleanResponse;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+
+/**
+ * Set add request.
+ */
+public class ClientIgniteSetValueAddRequest extends ClientIgniteSetKeyRequest {
+ /**
+ * Constructor.
+ *
+ * @param reader Reader.
+ */
+ public ClientIgniteSetValueAddRequest(BinaryRawReaderEx reader) {
+ super(reader);
+ }
+
+ /** {@inheritDoc} */
+ @Override ClientResponse process(IgniteSet<Object> set, Object key) {
+ return new ClientBooleanResponse(requestId(), set.add(key));
+ }
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.datastructures;
+
+import java.util.List;
+import org.apache.ignite.IgniteSet;
+import org.apache.ignite.internal.binary.BinaryRawReaderEx;
+import org.apache.ignite.internal.processors.platform.client.ClientBooleanResponse;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+
+/**
+ * Set contains all request.
+ */
+public class ClientIgniteSetValueContainsAllRequest extends ClientIgniteSetKeysRequest {
+ /**
+ * Constructor.
+ *
+ * @param reader Reader.
+ */
+ public ClientIgniteSetValueContainsAllRequest(BinaryRawReaderEx reader) {
+ super(reader);
+ }
+
+ /** {@inheritDoc} */
+ @Override ClientResponse process(IgniteSet<Object> set, List<Object> keys) {
+ return new ClientBooleanResponse(requestId(), set.containsAll(keys));
+ }
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.datastructures;
+
+import org.apache.ignite.IgniteSet;
+import org.apache.ignite.internal.binary.BinaryRawReaderEx;
+import org.apache.ignite.internal.processors.platform.client.ClientBooleanResponse;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+
+/**
+ * Set contains request.
+ */
+public class ClientIgniteSetValueContainsRequest extends ClientIgniteSetKeyRequest {
+ /**
+ * Constructor.
+ *
+ * @param reader Reader.
+ */
+ public ClientIgniteSetValueContainsRequest(BinaryRawReaderEx reader) {
+ super(reader);
+ }
+
+ /** {@inheritDoc} */
+ @Override ClientResponse process(IgniteSet<Object> set, Object key) {
+ return new ClientBooleanResponse(requestId(), set.contains(key));
+ }
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.datastructures;
+
+import java.util.List;
+import org.apache.ignite.IgniteSet;
+import org.apache.ignite.internal.binary.BinaryRawReaderEx;
+import org.apache.ignite.internal.processors.platform.client.ClientBooleanResponse;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+
+/**
+ * Set value remove all request.
+ */
+public class ClientIgniteSetValueRemoveAllRequest extends ClientIgniteSetKeysRequest {
+ /**
+ * Constructor.
+ *
+ * @param reader Reader.
+ */
+ public ClientIgniteSetValueRemoveAllRequest(BinaryRawReaderEx reader) {
+ super(reader);
+ }
+
+ /** {@inheritDoc} */
+ @Override ClientResponse process(IgniteSet<Object> set, List<Object> keys) {
+ //noinspection SlowAbstractSetRemoveAll
+ return new ClientBooleanResponse(requestId(), set.removeAll(keys));
+ }
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.datastructures;
+
+import org.apache.ignite.IgniteSet;
+import org.apache.ignite.internal.binary.BinaryRawReaderEx;
+import org.apache.ignite.internal.processors.platform.client.ClientBooleanResponse;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+
+/**
+ * Set value remove request.
+ */
+public class ClientIgniteSetValueRemoveRequest extends ClientIgniteSetKeyRequest {
+ /**
+ * Constructor.
+ *
+ * @param reader Reader.
+ */
+ public ClientIgniteSetValueRemoveRequest(BinaryRawReaderEx reader) {
+ super(reader);
+ }
+
+ /** {@inheritDoc} */
+ @Override ClientResponse process(IgniteSet<Object> set, Object key) {
+ return new ClientBooleanResponse(requestId(), set.remove(key));
+ }
+}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.datastructures;
+
+import java.util.List;
+import org.apache.ignite.IgniteSet;
+import org.apache.ignite.internal.binary.BinaryRawReaderEx;
+import org.apache.ignite.internal.processors.platform.client.ClientBooleanResponse;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+
+/**
+ * Set retain all request.
+ */
+public class ClientIgniteSetValueRetainAllRequest extends ClientIgniteSetKeysRequest {
+ /**
+ * Constructor.
+ *
+ * @param reader Reader.
+ */
+ public ClientIgniteSetValueRetainAllRequest(BinaryRawReaderEx reader) {
+ super(reader);
+ }
+
+ /** {@inheritDoc} */
+ @Override ClientResponse process(IgniteSet<Object> set, List<Object> keys) {
+ return new ClientBooleanResponse(requestId(), set.retainAll(keys));
+ }
+}
org.apache.ignite.internal.processors.datastructures.GridCacheSetImpl$5
org.apache.ignite.internal.processors.datastructures.GridCacheSetImpl$SumReducer
org.apache.ignite.internal.processors.datastructures.GridCacheSetItemKey
+org.apache.ignite.internal.processors.datastructures.GridCacheSetItemKey$HashCodeHolder
org.apache.ignite.internal.processors.datastructures.GridCacheSetProxy
org.apache.ignite.internal.processors.datastructures.GridSetQueryPredicate
org.apache.ignite.internal.processors.datastructures.VolatileAtomicDataStructureValue
String nullOpsNames = nullOps.stream().map(Enum::name).collect(Collectors.joining(", "));
- long expectedNullCount = 15;
+ long expectedNullCount = 16;
String msg = nullOps.size()
+ " operation codes do not have public equivalent. When adding new codes, update ClientOperationType too. Missing ops: "
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.client.ClientAtomicConfiguration;
import org.apache.ignite.client.ClientAtomicLong;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.platform.client.ClientStatus;
import org.junit.Test;
import static org.apache.ignite.testframework.GridTestUtils.assertContains;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
/**
* Tests client atomic long.
+ * Partition awareness tests are in {@link ThinClientPartitionAwarenessStableTopologyTest#testAtomicLong()}.
*/
public class AtomicLongTest extends AbstractThinClientTest {
/** {@inheritDoc} */
assertEquals(1, defaultCache.configuration().getBackups());
}
+ /**
+ * Tests atomic long with same name and group name, but different cache modes.
+ */
+ @Test
+ public void testSameNameDifferentOptionsDoesNotCreateSecondAtomic() {
+ String groupName = "testSameNameDifferentOptions";
+
+ ClientAtomicConfiguration cfg1 = new ClientAtomicConfiguration()
+ .setCacheMode(CacheMode.REPLICATED)
+ .setGroupName(groupName);
+
+ ClientAtomicConfiguration cfg2 = new ClientAtomicConfiguration()
+ .setCacheMode(CacheMode.PARTITIONED)
+ .setGroupName(groupName);
+
+ String name = "testSameNameDifferentOptionsDoesNotCreateSecondAtomic";
+
+ try (IgniteClient client = startClient(0)) {
+ ClientAtomicLong al1 = client.atomicLong(name, cfg1, 1, true);
+ ClientAtomicLong al2 = client.atomicLong(name, cfg2, 2, true);
+ ClientAtomicLong al3 = client.atomicLong(name, 3, true);
+
+ assertEquals(1, al1.get());
+ assertEquals(1, al2.get());
+ assertEquals(3, al3.get());
+ }
+
+ List<IgniteInternalCache<?, ?>> caches = grid(0).cachesx().stream()
+ .filter(c -> c.name().contains(groupName))
+ .collect(Collectors.toList());
+
+ assertEquals(1, caches.size());
+
+ IgniteInternalCache<?, ?> replicatedCache = caches.get(0);
+
+ assertEquals("ignite-sys-atomic-cache@testSameNameDifferentOptions", replicatedCache.name());
+ assertEquals(Integer.MAX_VALUE, replicatedCache.configuration().getBackups());
+ }
+
/**
* Asserts that "does not exist" error is thrown.
*
ClientException ex = (ClientException)assertThrows(null, callable, ClientException.class, null);
assertContains(null, ex.getMessage(), "AtomicLong with name '" + name + "' does not exist.");
+ assertEquals(ClientStatus.RESOURCE_DOES_NOT_EXIST, ((ClientServerError)ex.getCause()).getCode());
}
}
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.lang.reflect.Field;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import com.google.common.collect.ImmutableList;
+import org.apache.ignite.IgniteSet;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.client.ClientAutoCloseableIterator;
+import org.apache.ignite.client.ClientCollectionConfiguration;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.ClientIgniteSet;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.CollectionConfiguration;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.datastructures.GridCacheSetProxy;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+/**
+ * Tests client set.
+ * Partition awareness tests are in {@link ThinClientPartitionAwarenessStableTopologyTest#testIgniteSet()}.
+ */
+@SuppressWarnings({"rawtypes", "ZeroLengthArrayAllocation", "ThrowableNotThrown"})
+public class IgniteSetTest extends AbstractThinClientTest {
+ /** Client. */
+ static IgniteClient client;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(1);
+ client = startClient(0);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected ClientConfiguration getClientConfiguration() {
+ return super.getClientConfiguration().setPartitionAwarenessEnabled(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ client.close();
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * Tests that missing set returns null.
+ */
+ @Test
+ public void testGetNonExistentSetReturnsNull() {
+ assertNull(client.set("non-existent", null));
+ }
+
+ /**
+ * Tests that closed set throws exceptions.
+ */
+ @Test
+ public void testCloseThenUseThrowsException() {
+ ClientIgniteSet<Integer> set = client.set("testCloseThenUseThrowsException", new ClientCollectionConfiguration());
+ ClientIgniteSet<Integer> set2 = client.set(set.name(), null);
+
+ set.add(1);
+ set.close();
+
+ assertThrowsClosed(set);
+ assertThrowsClosed(set2);
+
+ assertTrue(set.removed());
+ assertTrue(set2.removed());
+ }
+
+ /**
+ * Tests creating a new set with the old name.
+ */
+ @Test
+ public void testCloseAndCreateWithSameName() {
+ ClientIgniteSet<Integer> oldSet = client.set("testCreateCloseCreateRemovesOldData", new ClientCollectionConfiguration());
+
+ oldSet.add(1);
+ oldSet.close();
+
+ assertTrue(oldSet.removed());
+
+ ClientIgniteSet<Integer> newSet = client.set(oldSet.name(), new ClientCollectionConfiguration());
+
+ assertEquals(0, newSet.size());
+
+ // Set is identified by id, so it is no longer removed.
+ assertFalse(newSet.removed());
+ assertFalse(oldSet.removed());
+ }
+
+ /**
+ * Tests basic usage.
+ */
+ @Test
+ public void testAddRemoveContains() {
+ ClientIgniteSet<String> set = client.set("testBasicUsage", new ClientCollectionConfiguration());
+
+ assertTrue(set.isEmpty());
+
+ set.add("foo");
+ set.add("bar");
+
+ assertTrue(set.contains("foo"));
+ assertTrue(set.contains("bar"));
+ assertFalse(set.contains("baz"));
+
+ set.remove("foo");
+ assertFalse(set.contains("foo"));
+
+ assertEquals(1, set.size());
+ assertEquals("bar", set.iterator().next());
+ }
+
+ /**
+ * Tests addAll.
+ */
+ @Test
+ public void testAddAll() {
+ ClientIgniteSet<Integer> set = client.set("testAddAll", new ClientCollectionConfiguration());
+
+ assertTrue(set.addAll(ImmutableList.of(1, 3)));
+ assertTrue(set.contains(1));
+ assertFalse(set.contains(2));
+ assertTrue(set.contains(3));
+ assertEquals(2, set.size());
+
+ assertTrue(set.addAll(ImmutableList.of(1, 2, 3)));
+ assertTrue(set.contains(1));
+ assertTrue(set.contains(2));
+ assertTrue(set.contains(3));
+ assertEquals(3, set.size());
+
+ assertFalse(set.addAll(ImmutableList.of(2, 3)));
+ assertFalse(set.addAll(ImmutableList.of(3)));
+ assertFalse(set.addAll(ImmutableList.of()));
+
+ assertEquals(3, set.size());
+ }
+
+ /**
+ * Tests containsAll.
+ */
+ @Test
+ @SuppressWarnings("SuspiciousMethodCalls")
+ public void testContainsAll() {
+ ClientIgniteSet<Integer> set = client.set("testContainsAll", new ClientCollectionConfiguration());
+ set.addAll(ImmutableList.of(1, 2, 3));
+
+ assertTrue(set.containsAll(ImmutableList.of(1)));
+ assertTrue(set.containsAll(ImmutableList.of(1, 2)));
+ assertTrue(set.containsAll(ImmutableList.of(2, 1)));
+ assertTrue(set.containsAll(ImmutableList.of(3, 1, 2)));
+
+ assertFalse(set.containsAll(ImmutableList.of()));
+ assertFalse(set.containsAll(ImmutableList.of(0)));
+ assertFalse(set.containsAll(ImmutableList.of(0, 1)));
+ assertFalse(set.containsAll(ImmutableList.of(1, 2, 4)));
+ }
+
+ /**
+ * Tests removeAll.
+ */
+ @Test
+ @SuppressWarnings({"SlowAbstractSetRemoveAll", "SuspiciousMethodCalls"})
+ public void testRemoveAll() {
+ ClientIgniteSet<Integer> set = client.set("testRemoveAll", new ClientCollectionConfiguration());
+ set.addAll(ImmutableList.of(1, 2, 3));
+
+ assertFalse(set.removeAll(ImmutableList.of()));
+ assertFalse(set.removeAll(ImmutableList.of(0)));
+ assertFalse(set.removeAll(ImmutableList.of(0, 4)));
+
+ assertEquals(3, set.size());
+
+ assertTrue(set.removeAll(ImmutableList.of(5, 4, 3, 1, 0)));
+
+ assertEquals(1, set.size());
+ assertTrue(set.contains(2));
+ }
+
+ /**
+ * Tests retainAll.
+ */
+ @Test
+ @SuppressWarnings("SuspiciousMethodCalls")
+ public void testRetainAll() {
+ ClientIgniteSet<Integer> set = client.set("testRemoveAll", new ClientCollectionConfiguration());
+
+ assertFalse(set.retainAll(ImmutableList.of()));
+
+ set.addAll(ImmutableList.of(1, 2, 3));
+
+ assertFalse(set.retainAll(ImmutableList.of(3, 2, 1, 4)));
+ assertFalse(set.retainAll(ImmutableList.of(1, 2, 3)));
+ assertEquals(3, set.size());
+
+ assertTrue(set.retainAll(ImmutableList.of(1, 4, 7)));
+ assertEquals(1, set.size());
+ assertTrue(set.contains(1));
+
+ // retainAll with empty list: clear the collection and get a boolean value indicating if it was empty or not.
+ assertTrue(set.retainAll(ImmutableList.of()));
+ assertTrue(set.isEmpty());
+ }
+
+ /**
+ * Tests user object types as set values.
+ */
+ @Test
+ public void testUserObject() {
+ ClientIgniteSet<UserObj> clientSet = client.set("testUserObject", new ClientCollectionConfiguration());
+
+ UserObj obj1 = new UserObj(1, "a");
+ UserObj obj2 = new UserObj(2, "a");
+
+ clientSet.add(obj1);
+ clientSet.add(obj2);
+
+ assertTrue(clientSet.contains(obj1));
+ assertTrue(clientSet.contains(new UserObj(1, "a")));
+ assertTrue(clientSet.containsAll(ImmutableList.of(obj1, obj2)));
+
+ assertFalse(clientSet.contains(new UserObj(1, "b")));
+ }
+
+ /**
+ * Tests user object types as set values with server-side API interop.
+ */
+ @Test
+ public void testUserObjectClientServer() {
+ ClientIgniteSet<UserObj> clientSet = client.set("testUserObjectClientServer", new ClientCollectionConfiguration());
+
+ // By default, Client sends obj as BinaryObject, resulting in a different behavior.
+ // When thick and thin APIs are used with the same user-defined classes together,
+ // it means that classes are available on the server, and we can deserialize the obj to enable matching behavior.
+ clientSet.serverKeepBinary(false);
+
+ IgniteSet<UserObj> serverSet = ignite(0).set(clientSet.name(), null);
+
+ clientSet.add(new UserObj(1, "client"));
+ serverSet.add(new UserObj(2, "server"));
+
+ assertTrue(clientSet.contains(new UserObj(1, "client")));
+ assertTrue(clientSet.contains(new UserObj(2, "server")));
+
+ assertTrue(serverSet.contains(new UserObj(1, "client")));
+ assertTrue(serverSet.contains(new UserObj(2, "server")));
+
+ assertFalse(clientSet.contains(new UserObj(1, "x")));
+ assertFalse(serverSet.contains(new UserObj(1, "x")));
+ }
+
+ /**
+ * Tests config propagation.
+ */
+ @Test
+ public void testConfigPropagation() throws Exception {
+ String groupName = "grp-testConfigPropagation";
+
+ ClientCollectionConfiguration cfg = new ClientCollectionConfiguration()
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ .setCacheMode(CacheMode.PARTITIONED)
+ .setBackups(7)
+ .setColocated(true)
+ .setGroupName(groupName);
+
+ CollectionConfiguration serverCfg = new CollectionConfiguration()
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+ .setCacheMode(CacheMode.PARTITIONED)
+ .setBackups(7)
+ .setCollocated(true)
+ .setGroupName(groupName);
+
+ ClientIgniteSet<UserObj> set = client.set("testConfigPropagation", cfg);
+
+ GridCacheSetProxy serverSet = (GridCacheSetProxy)ignite(0).set(set.name(), serverCfg);
+
+ Field field = GridCacheSetProxy.class.getDeclaredField("cctx");
+ field.setAccessible(true);
+ GridCacheContext cctx = (GridCacheContext)field.get(serverSet);
+
+ assertTrue(set.colocated());
+ assertFalse(set.removed());
+ assertEquals("testConfigPropagation", set.name());
+ assertEquals(7, cctx.config().getBackups());
+ assertEquals(CacheMode.PARTITIONED, cctx.config().getCacheMode());
+ assertEquals(CacheAtomicityMode.TRANSACTIONAL, cctx.config().getAtomicityMode());
+ assertEquals(groupName, cctx.config().getGroupName());
+ }
+
+ /**
+ * Tests different cache groups.
+ */
+ @Test
+ public void testSameNameInDifferentGroups() {
+ String name = "testSameNameInDifferentGroups";
+ ClientCollectionConfiguration cfg1 = new ClientCollectionConfiguration();
+
+ ClientCollectionConfiguration cfg2 = new ClientCollectionConfiguration()
+ .setGroupName("gp1");
+
+ ClientCollectionConfiguration cfg3 = new ClientCollectionConfiguration()
+ .setGroupName("gp2")
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+
+ ClientIgniteSet<Integer> set1 = client.set(name, cfg1);
+ ClientIgniteSet<Integer> set2 = client.set(name, cfg2);
+ ClientIgniteSet<Integer> set3 = client.set(name, cfg3);
+
+ set1.add(1);
+ set2.add(2);
+ set3.add(3);
+
+ assertTrue(set1.contains(1));
+ assertTrue(set2.contains(2));
+ assertTrue(set3.contains(3));
+
+ assertFalse(set1.contains(2));
+ assertFalse(set2.contains(3));
+ assertFalse(set3.contains(1));
+ }
+
+ /**
+ * Tests same set name with different options.
+ */
+ @Test
+ public void testSameNameDifferentOptions() {
+ String name = "testSameNameDifferentOptions";
+ ClientCollectionConfiguration cfg1 = new ClientCollectionConfiguration()
+ .setGroupName("gp1");
+
+ ClientCollectionConfiguration cfg2 = new ClientCollectionConfiguration()
+ .setGroupName("gp1")
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+
+ ClientIgniteSet<Integer> set1 = client.set(name, cfg1);
+ ClientIgniteSet<Integer> set2 = client.set(name, cfg2);
+
+ set1.add(2);
+ set2.add(3);
+
+ assertTrue(set1.contains(2));
+ assertTrue(set2.contains(3));
+
+ assertFalse(set1.contains(3));
+ assertFalse(set2.contains(1));
+ }
+
+ /**
+ * Tests iterator over an empty set.
+ */
+ @Test
+ public void testIteratorEmpty() {
+ ClientIgniteSet<Integer> set = client.set("testIteratorEmpty", new ClientCollectionConfiguration());
+
+ ClientAutoCloseableIterator<Integer> iterator = set.iterator();
+
+ assertEquals(1024, set.pageSize());
+ assertFalse(iterator.hasNext());
+ GridTestUtils.assertThrows(null, iterator::next, NoSuchElementException.class, null);
+ }
+
+ /**
+ * Tests that iterator closes itself when the last page is retrieved.
+ */
+ @Test
+ public void testIteratorClosesOnLastPage() throws Exception {
+ ClientIgniteSet<Integer> set = client.set("testCloseBeforeEnd", new ClientCollectionConfiguration());
+ set.pageSize(1);
+
+ ImmutableList<Integer> keys = ImmutableList.of(1, 2, 3);
+ set.addAll(keys);
+
+ ClientAutoCloseableIterator<Integer> iter = set.iterator();
+
+ assertFalse(isIteratorClosed(iter));
+ assertTrue(iter.hasNext());
+
+ iter.next();
+
+ assertFalse(isIteratorClosed(iter));
+ assertTrue(iter.hasNext());
+
+ iter.next();
+
+ assertTrue(isIteratorClosed(iter));
+ assertTrue(iter.hasNext());
+
+ iter.next();
+
+ assertFalse(iter.hasNext());
+ }
+
+ /**
+ * Tests closing the iterator before it is finished.
+ */
+ @Test
+ public void testCloseBeforeEnd() throws Exception {
+ ClientIgniteSet<Integer> set = client.set("testCloseBeforeEnd", new ClientCollectionConfiguration());
+ set.pageSize(1);
+
+ ImmutableList<Integer> keys = ImmutableList.of(1, 2, 3);
+ set.addAll(keys);
+
+ ClientAutoCloseableIterator<Integer> iter = set.iterator();
+
+ assertTrue(iter.hasNext());
+ iter.close();
+
+ assertFalse(iter.hasNext());
+ }
+
+ /**
+ * Tests iterator in a foreach loop.
+ */
+ @Test
+ public void testIteratorForeach() {
+ ClientIgniteSet<Integer> set = client.set("testIteratorForeach", new ClientCollectionConfiguration());
+ set.pageSize(2);
+
+ ImmutableList<Integer> keys = ImmutableList.of(1, 2, 3);
+ set.addAll(keys);
+
+ int count = 0;
+
+ for (Integer k : set) {
+ assertTrue(keys.contains(k));
+ count++;
+ }
+
+ assertEquals(keys.size(), count);
+ }
+
+ /**
+ * Tests iterator with data modifications.
+ */
+ @Test
+ public void testModifyWhileIterating() {
+ ClientIgniteSet<Integer> set = client.set("testModifyWhileIterating", new ClientCollectionConfiguration());
+ set.pageSize(1);
+
+ ImmutableList<Integer> keys = ImmutableList.of(1, 2, 3);
+ set.addAll(keys);
+
+ ClientAutoCloseableIterator<Integer> iterator = set.iterator();
+
+ set.remove(3);
+ assertTrue(keys.contains(iterator.next()));
+
+ set.remove(2);
+ assertTrue(keys.contains(iterator.next()));
+
+ assertFalse(iterator.hasNext());
+ }
+
+ /**
+ * Tests toArray on empty set.
+ */
+ @Test
+ public void testToArrayEmpty() {
+ ClientIgniteSet<Integer> set = client.set("testToArrayEmpty", new ClientCollectionConfiguration());
+
+ assertEquals(0, set.toArray().length);
+ assertEquals(0, set.toArray(new Integer[0]).length);
+ }
+
+ /**
+ * Tests toArray.
+ */
+ @Test
+ public void testToArray() {
+ for (int i = 1; i < 10; i++)
+ testToArray(i);
+ }
+
+ /**
+ * Tests toArray.
+ */
+ public void testToArray(int pageSize) {
+ ClientIgniteSet<Integer> set = client.set("testToArray", new ClientCollectionConfiguration());
+ set.pageSize(pageSize);
+
+ ImmutableList<Integer> keys = ImmutableList.of(1, 2, 3, 4, 5);
+ set.addAll(keys);
+
+ Integer[] resTyped = set.toArray(new Integer[0]);
+
+ assertEquals(5, resTyped.length);
+
+ for (Integer k : resTyped)
+ assertTrue(keys.contains(k));
+
+ Object[] resObjects = set.toArray();
+
+ assertEquals(5, resObjects.length);
+
+ for (Object k : resObjects)
+ assertTrue(keys.contains((Integer)k));
+ }
+
+ /**
+ * Asserts that usage throws closed exception.
+ */
+ @SuppressWarnings("ThrowableNotThrown")
+ private static void assertThrowsClosed(ClientIgniteSet<Integer> set) {
+ String msg = "IgniteSet with name '" + set.name() + "' does not exist.";
+ GridTestUtils.assertThrows(null, set::size, ClientException.class, msg);
+ }
+
+ /**
+ * Returns a value indicating whether iterator resources are closed.
+ *
+ * @param iter Iterator.
+ * @return Whether iterator resources are closed.
+ */
+ private static boolean isIteratorClosed(ClientAutoCloseableIterator<Integer> iter) throws Exception {
+ Field field = iter.getClass().getDeclaredField("resourceId");
+ field.setAccessible(true);
+
+ return field.get(iter) == null;
+ }
+
+ /**
+ * Custom user class.
+ */
+ private static class UserObj {
+ /** */
+ public final int id;
+
+ /** */
+ public final String val;
+
+ /** */
+ public UserObj(int id, String val) {
+ this.id = id;
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ UserObj userObj = (UserObj)o;
+ return id == userObj.id && Objects.equals(val, userObj.val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Objects.hash(id, val);
+ }
+ }
+}
import java.util.function.Function;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.client.ClientAtomicConfiguration;
import org.apache.ignite.client.ClientAtomicLong;
import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.ClientCollectionConfiguration;
+import org.apache.ignite.client.ClientIgniteSet;
import org.apache.ignite.configuration.AtomicConfiguration;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.datastructures.GridCacheAtomicLongEx;
}
}
+ /**
+ * Tests {@link ClientIgniteSet} partition awareness.
+ * Other client set tests are in {@link IgniteSetTest}.
+ */
+ @Test
+ public void testIgniteSet() {
+ testIgniteSet("testIgniteSet", null, CacheAtomicityMode.ATOMIC);
+ testIgniteSet("testIgniteSet2", null, CacheAtomicityMode.TRANSACTIONAL);
+ testIgniteSet("testIgniteSet3", "grp-testIgniteSet3", CacheAtomicityMode.ATOMIC);
+ testIgniteSet("testIgniteSet4", "grp-testIgniteSet4", CacheAtomicityMode.TRANSACTIONAL);
+ }
+
+ /**
+ * Tests {@link ClientIgniteSet} partition awareness.
+ */
+ private void testIgniteSet(String name, String groupName, CacheAtomicityMode mode) {
+ ClientCollectionConfiguration cfg = new ClientCollectionConfiguration()
+ .setGroupName(groupName)
+ .setAtomicityMode(mode)
+ .setBackups(1);
+
+ ClientIgniteSet<String> clientSet = client.set(name, cfg);
+
+ if (groupName == null)
+ groupName = "default-ds-group";
+
+ String cacheName = "datastructures_" + mode + "_PARTITIONED_1@" + groupName + "#SET_" + clientSet.name();
+ IgniteInternalCache<Object, Object> cache = grid(0).context().cache().cache(cacheName);
+
+ // Warm up.
+ clientSet.add("a");
+ opsQueue.clear();
+
+ // Test.
+ for (int i = 0; i < 10; i++) {
+ String key = "b" + i;
+ clientSet.add(key);
+
+ TestTcpClientChannel opCh = affinityChannel(key, cache);
+ assertOpOnChannel(opCh, ClientOperation.OP_SET_VALUE_ADD);
+ }
+ }
+
+ /**
+ * Tests {@link ClientIgniteSet} partition awareness in colocated mode.
+ */
+ @Test
+ public void testIgniteSetCollocated() {
+ testIgniteSetCollocated("testIgniteSetCollocated", null, CacheAtomicityMode.ATOMIC);
+ testIgniteSetCollocated("testIgniteSetCollocated2", null, CacheAtomicityMode.TRANSACTIONAL);
+ testIgniteSetCollocated("testIgniteSetCollocated3", "grp-testIgniteSetCollocated3", CacheAtomicityMode.ATOMIC);
+ testIgniteSetCollocated("testIgniteSetCollocated4", "grp-testIgniteSetCollocated4",
+ CacheAtomicityMode.TRANSACTIONAL);
+ }
+
+ /**
+ * Tests {@link ClientIgniteSet} partition awareness in colocated mode.
+ */
+ public void testIgniteSetCollocated(String name, String groupName, CacheAtomicityMode mode) {
+ ClientCollectionConfiguration cfg = new ClientCollectionConfiguration()
+ .setColocated(true)
+ .setGroupName(groupName)
+ .setAtomicityMode(mode)
+ .setBackups(1);
+
+ ClientIgniteSet<String> clientSet = client.set(name, cfg);
+
+ if (groupName == null)
+ groupName = "default-ds-group";
+
+ String cacheName = "datastructures_" + mode + "_PARTITIONED_1@" + groupName;
+ IgniteInternalCache<Object, Object> cache = grid(0).context().cache().cache(cacheName);
+
+ // Warm up.
+ clientSet.add("a");
+ opsQueue.clear();
+
+ // Test.
+ for (int i = 0; i < 10; i++) {
+ String key = "b" + i;
+ clientSet.add(key);
+
+ TestTcpClientChannel opCh = affinityChannel(clientSet.name().hashCode(), cache);
+ assertOpOnChannel(opCh, ClientOperation.OP_SET_VALUE_ADD);
+ }
+
+ // Test iterator.
+ clientSet.toArray();
+
+ TestTcpClientChannel opCh = affinityChannel(clientSet.name().hashCode(), cache);
+ assertOpOnChannel(opCh, ClientOperation.OP_SET_ITERATOR_START);
+ }
+
/**
* Test atomic long.
*/
import org.apache.ignite.IgniteCluster;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSet;
+import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.configuration.CacheConfiguration;
private void testApi(boolean collocated) throws Exception {
CollectionConfiguration colCfg = config(collocated);
- assertNotNull(grid(0).set(SET_NAME, colCfg));
+ IgniteSet<Object> set0 = grid(0).set(SET_NAME, colCfg);
+ assertNotNull(set0);
for (int i = 0; i < gridCount(); i++) {
Set<Integer> set = grid(i).set(SET_NAME, null);
// Add, isEmpty.
- assertTrue(grid(0).set(SET_NAME, null).add(1));
+ assertTrue(set0.add(1));
for (int i = 0; i < gridCount(); i++) {
Set<Integer> set = grid(i).set(SET_NAME, null);
// Remove.
- assertTrue(grid(0).set(SET_NAME, null).remove(1));
+ assertTrue(set0.remove(1));
for (int i = 0; i < gridCount(); i++) {
Set<Integer> set = grid(i).set(SET_NAME, null);
for (int i = ITEMS - 10; i < ITEMS; i++)
rmvCol.add(i);
- assertTrue(grid(0).set(SET_NAME, null).removeAll(rmvCol));
+ assertTrue(set0.removeAll(rmvCol));
for (int i = 0; i < gridCount(); i++) {
Set<Integer> set = grid(i).set(SET_NAME, null);
// Add all.
- assertTrue(grid(0).set(SET_NAME, null).addAll(rmvCol));
+ assertTrue(set0.addAll(rmvCol));
for (int i = 0; i < gridCount(); i++) {
Set<Integer> set = grid(i).set(SET_NAME, null);
// Retain all.
- assertTrue(grid(0).set(SET_NAME, null).retainAll(rmvCol));
+ assertTrue(set0.retainAll(rmvCol));
for (int i = 0; i < gridCount(); i++) {
Set<Integer> set = grid(i).set(SET_NAME, null);
assertTrue(set.contains(val));
}
+ assertFalse(set0.isEmpty());
+
+ // retainAll with empty list: clear the collection and get a boolean value indicating if it was empty or not.
+ assertTrue(set0.retainAll(new ArrayList<>()));
+ assertTrue(set0.isEmpty());
+
// Clear.
- grid(0).set(SET_NAME, null).clear();
+ set0.add(1);
+ set0.clear();
for (int i = 0; i < gridCount(); i++) {
Set<Integer> set = grid(i).set(SET_NAME, null);
set3.close();
}
+ /**
+ * Tests that new set with the same name as an old removed set does not contain old data.
+ */
+ @Test
+ @SuppressWarnings("ThrowableNotThrown")
+ public void testCloseAndCreateWithSameName() {
+ Ignite ignite = grid(0);
+
+ CollectionConfiguration cfg = collectionConfiguration();
+ IgniteSet<Integer> oldSet = ignite.set("testRemoveAndCreateWithSameName", cfg);
+ IgniteSet<Integer> oldSet2 = ignite.set(oldSet.name(), null);
+
+ oldSet.add(1);
+ oldSet.close();
+
+ IgniteSet<Integer> newSet = ignite.set(oldSet.name(), cfg);
+
+ assertEquals(0, newSet.size());
+
+ assertTrue(oldSet.removed());
+ assertTrue(oldSet2.removed());
+
+ String msg = "Set has been removed from cache";
+ GridTestUtils.assertThrows(null, oldSet::size, IllegalStateException.class, msg);
+ GridTestUtils.assertThrows(null, oldSet2::size, IllegalStateException.class, msg);
+
+ newSet.close();
+ }
+
+ /**
+ * Tests multiple sets with the same name but different cache options.
+ */
+ @Test
+ public void testSameNameDifferentOptions() {
+ Ignite ignite = grid(0);
+
+ String name = "testSameNameDifferentOptions";
+
+ CollectionConfiguration cfg1 = new CollectionConfiguration()
+ .setGroupName("gp1");
+
+ CollectionConfiguration cfg2 = new CollectionConfiguration()
+ .setGroupName("gp1")
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+
+ IgniteSet<Integer> set1 = ignite.set(name, cfg1);
+ IgniteSet<Integer> set1_1 = ignite.set(name, cfg1);
+ IgniteSet<Integer> set2 = ignite.set(name, cfg2);
+ IgniteSet<Integer> set2_2 = ignite.set(name, cfg2);
+
+ set1.add(1);
+
+ assertEquals(1, set1.size());
+ assertEquals(1, set1_1.size());
+ assertEquals(0, set2.size());
+ assertEquals(0, set2_2.size());
+
+ set1.close();
+ set2.close();
+ }
+
/**
* @param set Set.
* @param size Expected size.
throw new UnsupportedOperationException("Operation isn't supported yet.");
}
+ /** {@inheritDoc} */
+ @Override public <T> IgniteSet<T> set(String name, int cacheId, boolean collocated, boolean separated) throws IgniteException {
+ throw new UnsupportedOperationException("Operation isn't supported yet.");
+ }
+
/** {@inheritDoc} */
@Override public IgniteCompute compute() {
throw new UnsupportedOperationException("Operation isn't supported yet.");
import org.apache.ignite.internal.client.thin.ClusterApiTest;
import org.apache.ignite.internal.client.thin.ClusterGroupTest;
import org.apache.ignite.internal.client.thin.ComputeTaskTest;
+import org.apache.ignite.internal.client.thin.IgniteSetTest;
import org.apache.ignite.internal.client.thin.OptimizedMarshallerClassesCachedTest;
import org.apache.ignite.internal.client.thin.ReliableChannelTest;
import org.apache.ignite.internal.client.thin.ServicesBinaryArraysTests;
TimeoutTest.class,
OptimizedMarshallerClassesCachedTest.class,
AtomicLongTest.class,
- BinaryConfigurationTest.class
+ BinaryConfigurationTest.class,
+ IgniteSetTest.class
})
public class ClientTestSuite {
// No-op.
import org.apache.ignite.client.ClientCacheConfiguration;
import org.apache.ignite.client.ClientCluster;
import org.apache.ignite.client.ClientClusterGroup;
+import org.apache.ignite.client.ClientCollectionConfiguration;
import org.apache.ignite.client.ClientCompute;
import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.ClientIgniteSet;
import org.apache.ignite.client.ClientServices;
import org.apache.ignite.client.ClientTransactions;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.client.IgniteClientFuture;
import org.apache.ignite.configuration.ClientConfiguration;
+import org.jetbrains.annotations.Nullable;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.event.ContextRefreshedEvent;
return cli.atomicLong(name, cfg, initVal, create);
}
+ /** {@inheritDoc} */
+ @Override public <T> ClientIgniteSet<T> set(String name, @Nullable ClientCollectionConfiguration cfg) {
+ return cli.set(name, cfg);
+ }
+
/** {@inheritDoc} */
@Override public void close() {
cli.close();