IGNITE-425: Implementation of ContinuousQueryWithTransformer
authorNikolay Izhikov <nizhikov@apache.org>
Thu, 8 Feb 2018 11:51:32 +0000 (14:51 +0300)
committerNikolay Izhikov <nizhikov@apache.org>
Thu, 8 Feb 2018 11:51:32 +0000 (14:51 +0300)
17 files changed:
modules/core/src/main/java/org/apache/ignite/cache/query/AbstractContinuousQuery.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQueryWithTransformer.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java [new file with mode: 0644]
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerClientSelfTest.java [new file with mode: 0644]
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerFailoverTest.java [new file with mode: 0644]
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerLocalSelfTest.java [new file with mode: 0644]
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerPartitionedSelfTest.java [new file with mode: 0644]
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerRandomOperationsTest.java [new file with mode: 0644]
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerReplicatedSelfTest.java [new file with mode: 0644]
modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java

diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/AbstractContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/AbstractContinuousQuery.java
new file mode 100644 (file)
index 0000000..2a615ee
--- /dev/null
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.query;
+
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.EventType;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.lang.IgniteAsyncCallback;
+
+/**
+ * Base class for continuous query.
+ *
+ * @see ContinuousQuery
+ * @see ContinuousQueryWithTransformer
+ */
+public abstract class AbstractContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
+    /**
+     * Default page size. Size of {@code 1} means that all entries
+     * will be sent to master node immediately (buffering is disabled).
+     */
+    public static final int DFLT_PAGE_SIZE = 1;
+
+    /** Maximum default time interval after which buffer will be flushed (if buffering is enabled). */
+    public static final long DFLT_TIME_INTERVAL = 0;
+
+    /**
+     * Default value for automatic unsubscription flag. Remote filters
+     * will be unregistered by default if master node leaves topology.
+     */
+    public static final boolean DFLT_AUTO_UNSUBSCRIBE = true;
+
+    /** Initial query. */
+    private Query<Cache.Entry<K, V>> initQry;
+
+    /** Remote filter factory. */
+    private Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory;
+
+    /** Time interval. */
+    private long timeInterval = DFLT_TIME_INTERVAL;
+
+    /** Automatic unsubscription flag. */
+    private boolean autoUnsubscribe = DFLT_AUTO_UNSUBSCRIBE;
+
+    /** Whether to notify about {@link EventType#EXPIRED} events. */
+    private boolean includeExpired;
+
+    /**
+     * Sets initial query.
+     * <p>
+     * This query will be executed before continuous listener is registered
+     * which allows to iterate through entries which already existed at the
+     * time continuous query is executed.
+     *
+     * @param initQry Initial query.
+     * @return {@code this} for chaining.
+     */
+    public AbstractContinuousQuery<K, V> setInitialQuery(Query<Cache.Entry<K, V>> initQry) {
+        this.initQry = initQry;
+
+        return this;
+    }
+
+    /**
+     * Gets initial query.
+     *
+     * @return Initial query.
+     */
+    public Query<Cache.Entry<K, V>> getInitialQuery() {
+        return initQry;
+    }
+
+    /**
+     * Sets optional key-value filter factory. This factory produces filter is called before entry is
+     * sent to the master node.
+     * <p>
+     * <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking
+     * (e.g., synchronization or transactional cache operations), should be executed asynchronously
+     * without blocking the thread that called the filter. Otherwise, you can get deadlocks.
+     * <p>
+     * If remote filter are annotated with {@link IgniteAsyncCallback} then it is executed in async callback
+     * pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) that allow to perform a cache operations.
+     *
+     * @param rmtFilterFactory Key-value filter factory.
+     * @return {@code this} for chaining.
+     * @see IgniteAsyncCallback
+     * @see IgniteConfiguration#getAsyncCallbackPoolSize()
+     */
+    public AbstractContinuousQuery<K, V> setRemoteFilterFactory(
+        Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory) {
+        this.rmtFilterFactory = rmtFilterFactory;
+
+        return this;
+    }
+
+    /**
+     * Gets remote filter.
+     *
+     * @return Remote filter.
+     */
+    public Factory<? extends CacheEntryEventFilter<K, V>> getRemoteFilterFactory() {
+        return rmtFilterFactory;
+    }
+
+    /**
+     * Sets time interval.
+     * <p>
+     * When a cache update happens, entry is first put into a buffer. Entries from buffer will
+     * be sent to the master node only if the buffer is full (its size can be provided via {@link #setPageSize(int)}
+     * method) or time provided via this method is exceeded.
+     * <p>
+     * Default time interval is {@code 0} which means that
+     * time check is disabled and entries will be sent only when buffer is full.
+     *
+     * @param timeInterval Time interval.
+     * @return {@code this} for chaining.
+     */
+    public AbstractContinuousQuery<K, V> setTimeInterval(long timeInterval) {
+        if (timeInterval < 0)
+            throw new IllegalArgumentException("Time interval can't be negative.");
+
+        this.timeInterval = timeInterval;
+
+        return this;
+    }
+
+    /**
+     * Gets time interval.
+     *
+     * @return Time interval.
+     */
+    public long getTimeInterval() {
+        return timeInterval;
+    }
+
+    /**
+     * Sets automatic unsubscribe flag.
+     * <p>
+     * This flag indicates that query filters on remote nodes should be
+     * automatically unregistered if master node (node that initiated the query) leaves topology. If this flag is
+     * {@code false}, filters will be unregistered only when the query is cancelled from master node, and won't ever be
+     * unregistered if master node leaves grid.
+     * <p>
+     * Default value for this flag is {@code true}.
+     *
+     * @param autoUnsubscribe Automatic unsubscription flag.
+     * @return {@code this} for chaining.
+     */
+    public AbstractContinuousQuery<K, V> setAutoUnsubscribe(boolean autoUnsubscribe) {
+        this.autoUnsubscribe = autoUnsubscribe;
+
+        return this;
+    }
+
+    /**
+     * Gets automatic unsubscription flag value.
+     *
+     * @return Automatic unsubscription flag.
+     */
+    public boolean isAutoUnsubscribe() {
+        return autoUnsubscribe;
+    }
+
+    /**
+     * Sets the flag value defining whether to notify about {@link EventType#EXPIRED} events.
+     * If {@code true}, then the remote listener will get notifications about entries
+     * expired in cache. Otherwise, only {@link EventType#CREATED}, {@link EventType#UPDATED}
+     * and {@link EventType#REMOVED} events will be fired in the remote listener.
+     * <p>
+     * This flag is {@code false} by default, so {@link EventType#EXPIRED} events are disabled.
+     *
+     * @param includeExpired Whether to notify about {@link EventType#EXPIRED} events.
+     */
+    public void setIncludeExpired(boolean includeExpired) {
+        this.includeExpired = includeExpired;
+    }
+
+    /**
+     * Gets the flag value defining whether to notify about {@link EventType#EXPIRED} events.
+     *
+     * @return Whether to notify about {@link EventType#EXPIRED} events.
+     */
+    public boolean isIncludeExpired() {
+        return includeExpired;
+    }
+}
index 49d471e..9a8fbca 100644 (file)
@@ -21,9 +21,9 @@ import javax.cache.Cache;
 import javax.cache.configuration.Factory;
 import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryUpdatedListener;
-import javax.cache.event.EventType;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.lang.IgniteAsyncCallback;
 
@@ -103,49 +103,20 @@ import org.apache.ignite.lang.IgniteAsyncCallback;
  * is executed in async callback pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()})
  * and notification order is kept the same as update order for given cache key.
  *
+ * @see ContinuousQueryWithTransformer
  * @see IgniteAsyncCallback
  * @see IgniteConfiguration#getAsyncCallbackPoolSize()
  */
-public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
+public final class ContinuousQuery<K, V> extends AbstractContinuousQuery<K, V> {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /**
-     * Default page size. Size of {@code 1} means that all entries
-     * will be sent to master node immediately (buffering is disabled).
-     */
-    public static final int DFLT_PAGE_SIZE = 1;
-
-    /** Maximum default time interval after which buffer will be flushed (if buffering is enabled). */
-    public static final long DFLT_TIME_INTERVAL = 0;
-
-    /**
-     * Default value for automatic unsubscription flag. Remote filters
-     * will be unregistered by default if master node leaves topology.
-     */
-    public static final boolean DFLT_AUTO_UNSUBSCRIBE = true;
-
-    /** Initial query. */
-    private Query<Cache.Entry<K, V>> initQry;
-
     /** Local listener. */
     private CacheEntryUpdatedListener<K, V> locLsnr;
 
     /** Remote filter. */
     private CacheEntryEventSerializableFilter<K, V> rmtFilter;
 
-    /** Remote filter factory. */
-    private Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory;
-
-    /** Time interval. */
-    private long timeInterval = DFLT_TIME_INTERVAL;
-
-    /** Automatic unsubscription flag. */
-    private boolean autoUnsubscribe = DFLT_AUTO_UNSUBSCRIBE;
-
-    /** Whether to notify about {@link EventType#EXPIRED} events. */
-    private boolean includeExpired;
-
     /**
      * Creates new continuous query.
      */
@@ -153,29 +124,9 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
         setPageSize(DFLT_PAGE_SIZE);
     }
 
-    /**
-     * Sets initial query.
-     * <p>
-     * This query will be executed before continuous listener is registered
-     * which allows to iterate through entries which already existed at the
-     * time continuous query is executed.
-     *
-     * @param initQry Initial query.
-     * @return {@code this} for chaining.
-     */
+    /** {@inheritDoc} */
     public ContinuousQuery<K, V> setInitialQuery(Query<Cache.Entry<K, V>> initQry) {
-        this.initQry = initQry;
-
-        return this;
-    }
-
-    /**
-     * Gets initial query.
-     *
-     * @return Initial query.
-     */
-    public Query<Cache.Entry<K, V>> getInitialQuery() {
-        return initQry;
+        return (ContinuousQuery<K, V>)super.setInitialQuery(initQry);
     }
 
     /**
@@ -197,6 +148,7 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
      * @return {@code this} for chaining.
      * @see IgniteAsyncCallback
      * @see IgniteConfiguration#getAsyncCallbackPoolSize()
+     * @see ContinuousQueryWithTransformer#setLocalListener(EventListener)
      */
     public ContinuousQuery<K, V> setLocalListener(CacheEntryUpdatedListener<K, V> locLsnr) {
         this.locLsnr = locLsnr;
@@ -246,118 +198,14 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
         return rmtFilter;
     }
 
-    /**
-     * Sets optional key-value filter factory. This factory produces filter is called before entry is
-     * sent to the master node.
-     * <p>
-     * <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking
-     * (e.g., synchronization or transactional cache operations), should be executed asynchronously
-     * without blocking the thread that called the filter. Otherwise, you can get deadlocks.
-     * <p>
-     * If remote filter are annotated with {@link IgniteAsyncCallback} then it is executed in async callback
-     * pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) that allow to perform a cache operations.
-     *
-     * @param rmtFilterFactory Key-value filter factory.
-     * @return {@code this} for chaining.
-     * @see IgniteAsyncCallback
-     * @see IgniteConfiguration#getAsyncCallbackPoolSize()
-     */
-    public ContinuousQuery<K, V> setRemoteFilterFactory(
-        Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory) {
-        this.rmtFilterFactory = rmtFilterFactory;
-
-        return this;
-    }
-
-    /**
-     * Gets remote filter.
-     *
-     * @return Remote filter.
-     */
-    public Factory<? extends CacheEntryEventFilter<K, V>> getRemoteFilterFactory() {
-        return rmtFilterFactory;
-    }
-
-    /**
-     * Sets time interval.
-     * <p>
-     * When a cache update happens, entry is first put into a buffer. Entries from buffer will
-     * be sent to the master node only if the buffer is full (its size can be provided via {@link #setPageSize(int)}
-     * method) or time provided via this method is exceeded.
-     * <p>
-     * Default time interval is {@code 0} which means that
-     * time check is disabled and entries will be sent only when buffer is full.
-     *
-     * @param timeInterval Time interval.
-     * @return {@code this} for chaining.
-     */
+    /** {@inheritDoc} */
     public ContinuousQuery<K, V> setTimeInterval(long timeInterval) {
-        if (timeInterval < 0)
-            throw new IllegalArgumentException("Time interval can't be negative.");
-
-        this.timeInterval = timeInterval;
-
-        return this;
+        return (ContinuousQuery<K, V>)super.setTimeInterval(timeInterval);
     }
 
-    /**
-     * Gets time interval.
-     *
-     * @return Time interval.
-     */
-    public long getTimeInterval() {
-        return timeInterval;
-    }
-
-    /**
-     * Sets automatic unsubscribe flag.
-     * <p>
-     * This flag indicates that query filters on remote nodes should be
-     * automatically unregistered if master node (node that initiated the query) leaves topology. If this flag is
-     * {@code false}, filters will be unregistered only when the query is cancelled from master node, and won't ever be
-     * unregistered if master node leaves grid.
-     * <p>
-     * Default value for this flag is {@code true}.
-     *
-     * @param autoUnsubscribe Automatic unsubscription flag.
-     * @return {@code this} for chaining.
-     */
+    /** {@inheritDoc} */
     public ContinuousQuery<K, V> setAutoUnsubscribe(boolean autoUnsubscribe) {
-        this.autoUnsubscribe = autoUnsubscribe;
-
-        return this;
-    }
-
-    /**
-     * Gets automatic unsubscription flag value.
-     *
-     * @return Automatic unsubscription flag.
-     */
-    public boolean isAutoUnsubscribe() {
-        return autoUnsubscribe;
-    }
-
-    /**
-     * Sets the flag value defining whether to notify about {@link EventType#EXPIRED} events.
-     * If {@code true}, then the remote listener will get notifications about entries
-     * expired in cache. Otherwise, only {@link EventType#CREATED}, {@link EventType#UPDATED}
-     * and {@link EventType#REMOVED} events will be fired in the remote listener.
-     * <p>
-     * This flag is {@code false} by default, so {@link EventType#EXPIRED} events are disabled.
-     *
-     * @param includeExpired Whether to notify about {@link EventType#EXPIRED} events.
-     */
-    public void setIncludeExpired(boolean includeExpired) {
-        this.includeExpired = includeExpired;
-    }
-
-    /**
-     * Gets the flag value defining whether to notify about {@link EventType#EXPIRED} events.
-     *
-     * @return Whether to notify about {@link EventType#EXPIRED} events.
-     */
-    public boolean isIncludeExpired() {
-        return includeExpired;
+        return (ContinuousQuery<K, V>)super.setAutoUnsubscribe(autoUnsubscribe);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQueryWithTransformer.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQueryWithTransformer.java
new file mode 100644 (file)
index 0000000..122410f
--- /dev/null
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.query;
+
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.lang.IgniteAsyncCallback;
+import org.apache.ignite.lang.IgniteClosure;
+
+/**
+ * API for configuring continuous cache queries with transformer.
+ * <p>
+ * Continuous queries allow to register a remote filter and a local listener
+ * for cache updates. If an update event passes the filter, it will be transformed with transformer and sent to
+ * the node that executed the query and local listener will be notified.
+ * <p>
+ * Additionally, you can execute initial query to get currently existing data.
+ * Query can be of any type (SQL, TEXT or SCAN) and can be set via {@link #setInitialQuery(Query)}
+ * method.
+ * <p>
+ * Query can be executed either on all nodes in topology using {@link IgniteCache#query(Query)}
+ * method, or only on the local node, if {@link Query#setLocal(boolean)} parameter is set to {@code true}.
+ * Note that in case query is distributed and a new node joins, it will get the remote
+ * filter for the query during discovery process before it actually joins topology,
+ * so no updates will be missed.
+ * This will execute query on all nodes that have cache you are working with and
+ * listener will start to receive notifications for cache updates.
+ * <p>
+ * To stop receiving updates call {@link QueryCursor#close()} method.
+ * Note that this works even if you didn't provide initial query. Cursor will
+ * be empty in this case, but it will still unregister listeners when {@link QueryCursor#close()}
+ * is called.
+ * <p>
+ * {@link IgniteAsyncCallback} annotation is supported for {@link CacheEntryEventFilter}
+ * (see {@link #setRemoteFilterFactory(Factory)}) and {@link CacheEntryUpdatedListener}
+ * (see {@link #setRemoteTransformerFactory(Factory)}) and {@link CacheEntryUpdatedListener}
+ * (see {@link #setLocalListener(EventListener)} and {@link EventListener}).
+ * If filter and/or listener are annotated with {@link IgniteAsyncCallback} then annotated callback
+ * is executed in async callback pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()})
+ * and notification order is kept the same as update order for given cache key.
+ *
+ * @see ContinuousQuery
+ * @see IgniteAsyncCallback
+ * @see IgniteConfiguration#getAsyncCallbackPoolSize()
+ */
+public final class ContinuousQueryWithTransformer<K, V, T> extends AbstractContinuousQuery<K, V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Remote transformer factory. */
+    private Factory<? extends IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, T>> rmtTransFactory;
+
+    /** Local listener of transformed event */
+    private EventListener<T> locLsnr;
+
+    /**
+     * Creates new continuous query with transformer.
+     */
+    public ContinuousQueryWithTransformer() {
+        setPageSize(DFLT_PAGE_SIZE);
+    }
+
+    /** {@inheritDoc} */
+    public ContinuousQueryWithTransformer<K, V, T> setInitialQuery(Query<Cache.Entry<K, V>> initQry) {
+        return (ContinuousQueryWithTransformer<K, V, T>)super.setInitialQuery(initQry);
+    }
+
+    /** {@inheritDoc} */
+    public ContinuousQueryWithTransformer<K, V, T> setRemoteFilterFactory(
+        Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory) {
+        return (ContinuousQueryWithTransformer<K, V, T>)super.setRemoteFilterFactory(rmtFilterFactory);
+    }
+
+    /**
+     * Sets transformer factory. This factory produces transformer is called after and only if entry passes the filter.
+     * <p>
+     * <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking
+     * (e.g., synchronization or transactional cache operations), should be executed asynchronously
+     * without blocking the thread that called the filter. Otherwise, you can get deadlocks.
+     * <p>
+     *
+     * @param factory Remote transformer factory.
+     * @return {@code this} for chaining.
+     */
+    public ContinuousQueryWithTransformer<K, V, T> setRemoteTransformerFactory(
+        Factory<? extends IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, T>> factory) {
+        this.rmtTransFactory = factory;
+
+        return this;
+    }
+
+    /**
+     * Gets remote transformer factory
+     *
+     * @return Remote Transformer Factory
+     */
+    public Factory<? extends IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, T>> getRemoteTransformerFactory() {
+        return rmtTransFactory;
+    }
+
+    /**
+     * Sets local callback. This callback is called only in local node when new updates are received.
+     * <p>
+     * The callback predicate accepts results of transformed by {@link #getRemoteFilterFactory()} events
+     * <p>
+     * <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking (e.g.,
+     * synchronization or transactional cache operations), should be executed asynchronously without
+     * blocking the thread that called the callback. Otherwise, you can get deadlocks.
+     * <p>
+     * If local listener are annotated with {@link IgniteAsyncCallback} then it is executed in async callback pool
+     * (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) that allow to perform a cache operations.
+     *
+     * @param locLsnr Local callback.
+     * @return {@code this} for chaining.
+     *
+     * @see IgniteAsyncCallback
+     * @see IgniteConfiguration#getAsyncCallbackPoolSize()
+     * @see ContinuousQuery#setLocalListener(CacheEntryUpdatedListener)
+     */
+    public ContinuousQueryWithTransformer<K, V, T> setLocalListener(EventListener<T> locLsnr) {
+        this.locLsnr = locLsnr;
+
+        return this;
+    }
+
+    /**
+     * Gets local transformed event listener
+     *
+     * @return local transformed event listener
+     */
+    public EventListener<T> getLocalListener() {
+        return locLsnr;
+    }
+
+    /** {@inheritDoc} */
+    public ContinuousQueryWithTransformer<K, V, T> setTimeInterval(long timeInterval) {
+        return (ContinuousQueryWithTransformer<K, V, T>)super.setTimeInterval(timeInterval);
+    }
+
+    /** {@inheritDoc} */
+    public ContinuousQueryWithTransformer<K, V, T> setAutoUnsubscribe(boolean autoUnsubscribe) {
+        return (ContinuousQueryWithTransformer<K, V, T>)super.setAutoUnsubscribe(autoUnsubscribe);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ContinuousQueryWithTransformer<K, V, T> setPageSize(int pageSize) {
+        return (ContinuousQueryWithTransformer<K, V, T>)super.setPageSize(pageSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ContinuousQueryWithTransformer<K, V, T> setLocal(boolean loc) {
+        return (ContinuousQueryWithTransformer<K, V, T>)super.setLocal(loc);
+    }
+
+    /**
+     * Interface for local listener of {@link ContinuousQueryWithTransformer} to implement.
+     * Invoked if an cache entry is updated, created or if a batch call is made,
+     * after the entries are updated and transformed.
+     *
+     * @param <T> type of data produced by transformer {@link ContinuousQueryWithTransformer#getRemoteTransformerFactory()}.
+     * @see ContinuousQueryWithTransformer
+     * @see ContinuousQueryWithTransformer#setLocalListener(EventListener)
+     */
+    public interface EventListener<T> {
+        /**
+         * Called after one or more entries have been updated.
+         *
+         * @param events The entries just updated that transformed with remote transformer of {@link ContinuousQueryWithTransformer}.
+         */
+        void onUpdated(Iterable<? extends T> events);
+    }
+}
index 7f71c74..a834022 100644 (file)
@@ -36,6 +36,8 @@ import javax.cache.Cache;
 import javax.cache.CacheException;
 import javax.cache.configuration.CacheEntryListenerConfiguration;
 import javax.cache.configuration.Configuration;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryUpdatedListener;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.integration.CompletionListener;
 import javax.cache.processor.EntryProcessor;
@@ -46,11 +48,15 @@ import org.apache.ignite.IgniteCacheRestartingException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cache.CacheManager;
 import org.apache.ignite.cache.CacheMetrics;
 import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.query.AbstractContinuousQuery;
 import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.ContinuousQueryWithTransformer;
+import org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.Query;
 import org.apache.ignite.cache.query.QueryCursor;
@@ -62,6 +68,7 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.SqlQuery;
 import org.apache.ignite.cache.query.TextQuery;
 import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.AsyncSupportAdapter;
 import org.apache.ignite.internal.IgniteEx;
@@ -92,6 +99,7 @@ import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.mxbean.CacheMetricsMXBean;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.jetbrains.annotations.NotNull;
@@ -106,6 +114,12 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
     /** */
     private static final long serialVersionUID = 0L;
 
+    /**
+     * Ignite version that introduce {@link ContinuousQueryWithTransformer} feature.
+     */
+    private static final IgniteProductVersion CONT_QRY_WITH_TRANSFORMER_SINCE =
+        IgniteProductVersion.fromString("2.5.0");
+
     /** Context. */
     private volatile GridCacheContext<K, V> ctx;
 
@@ -498,22 +512,66 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
      * @return Initial iteration cursor.
      */
     @SuppressWarnings("unchecked")
-    private QueryCursor<Cache.Entry<K, V>> queryContinuous(ContinuousQuery qry, boolean loc, boolean keepBinary) {
-        if (qry.getInitialQuery() instanceof ContinuousQuery)
+    private QueryCursor<Cache.Entry<K, V>> queryContinuous(AbstractContinuousQuery qry, boolean loc, boolean keepBinary) {
+        assert qry instanceof ContinuousQuery || qry instanceof ContinuousQueryWithTransformer;
+
+        if (qry.getInitialQuery() instanceof ContinuousQuery ||
+            qry.getInitialQuery() instanceof ContinuousQueryWithTransformer) {
             throw new IgniteException("Initial predicate for continuous query can't be an instance of another " +
                 "continuous query. Use SCAN or SQL query for initial iteration.");
+        }
+
+        CacheEntryUpdatedListener locLsnr = null;
+
+        EventListener locTransLsnr = null;
+
+        CacheEntryEventSerializableFilter rmtFilter = null;
+
+        Factory<? extends IgniteClosure> rmtTransFactory = null;
+
+        if (qry instanceof ContinuousQuery) {
+            ContinuousQuery<K, V> qry0 = (ContinuousQuery<K, V>)qry;
+
+            if (qry0.getLocalListener() == null)
+                throw new IgniteException("Mandatory local listener is not set for the query: " + qry);
+
+            if (qry0.getRemoteFilter() != null && qry0.getRemoteFilterFactory() != null)
+                throw new IgniteException("Should be used either RemoterFilter or RemoteFilterFactory.");
 
-        if (qry.getLocalListener() == null)
-            throw new IgniteException("Mandatory local listener is not set for the query: " + qry);
+            locLsnr = qry0.getLocalListener();
 
-        if (qry.getRemoteFilter() != null && qry.getRemoteFilterFactory() != null)
-            throw new IgniteException("Should be used either RemoterFilter or RemoteFilterFactory.");
+            rmtFilter = qry0.getRemoteFilter();
+        }
+        else {
+            ContinuousQueryWithTransformer<K, V, ?> qry0 = (ContinuousQueryWithTransformer<K, V, ?>)qry;
+
+            if (qry0.getLocalListener() == null)
+                throw new IgniteException("Mandatory local transformed event listener is not set for the query: " + qry);
+
+            if (qry0.getRemoteTransformerFactory() == null)
+                throw new IgniteException("Mandatory RemoteTransformerFactory is not set for the query: " + qry);
+
+            Collection<ClusterNode> nodes = context().grid().cluster().nodes();
+
+            for (ClusterNode node : nodes) {
+                if (node.version().compareTo(CONT_QRY_WITH_TRANSFORMER_SINCE) < 0) {
+                    throw new IgniteException("Can't start ContinuousQueryWithTransformer, " +
+                        "because some nodes in cluster doesn't support this feature: " + node);
+                }
+            }
+
+            locTransLsnr = qry0.getLocalListener();
+
+            rmtTransFactory = qry0.getRemoteTransformerFactory();
+        }
 
         try {
             final UUID routineId = ctx.continuousQueries().executeQuery(
-                qry.getLocalListener(),
-                qry.getRemoteFilter(),
+                locLsnr,
+                locTransLsnr,
+                rmtFilter,
                 qry.getRemoteFilterFactory(),
+                rmtTransFactory,
                 qry.getPageSize(),
                 qry.getTimeInterval(),
                 qry.isAutoUnsubscribe(),
@@ -596,8 +654,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
             boolean keepBinary = opCtxCall != null && opCtxCall.isKeepBinary();
 
-            if (qry instanceof ContinuousQuery)
-                return (QueryCursor<R>)queryContinuous((ContinuousQuery<K, V>)qry, qry.isLocal(), keepBinary);
+            if (qry instanceof ContinuousQuery || qry instanceof ContinuousQueryWithTransformer)
+                return (QueryCursor<R>)queryContinuous((AbstractContinuousQuery)qry, qry.isLocal(), keepBinary);
 
             if (qry instanceof SqlQuery)
                 return (QueryCursor<R>)ctx.kernalContext().query().querySql(ctx, (SqlQuery)qry, keepBinary);
@@ -688,8 +746,8 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
      */
     private void validate(Query qry) {
         if (!QueryUtils.isEnabled(ctx.config()) && !(qry instanceof ScanQuery) &&
-            !(qry instanceof ContinuousQuery) && !(qry instanceof SpiQuery) && !(qry instanceof SqlQuery) &&
-            !(qry instanceof SqlFieldsQuery))
+            !(qry instanceof ContinuousQuery) && !(qry instanceof ContinuousQueryWithTransformer) &&
+            !(qry instanceof SpiQuery) && !(qry instanceof SqlQuery) && !(qry instanceof SqlFieldsQuery))
             throw new CacheException("Indexing is disabled for cache: " + ctx.cache().name() +
                     ". Use setIndexedTypes or setTypeMetadata methods on CacheConfiguration to enable.");
 
index 7e3f0b5..88005d0 100644 (file)
@@ -28,7 +28,6 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -282,9 +281,8 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
      * @throws IgniteCheckedException In case of error.
      */
     void prepareMarshal(GridCacheContext cctx) throws IgniteCheckedException {
-        assert key != null;
-
-        key.prepareMarshal(cctx.cacheObjectContext());
+        if (key != null)
+            key.prepareMarshal(cctx.cacheObjectContext());
 
         if (newVal != null)
             newVal.prepareMarshal(cctx.cacheObjectContext());
@@ -300,7 +298,8 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
      */
     void unmarshal(GridCacheContext cctx, @Nullable ClassLoader ldr) throws IgniteCheckedException {
         if (!isFiltered()) {
-            key.finishUnmarshal(cctx.cacheObjectContext(), ldr);
+            if (key != null)
+                key.finishUnmarshal(cctx.cacheObjectContext(), ldr);
 
             if (newVal != null)
                 newVal.finishUnmarshal(cctx.cacheObjectContext(), ldr);
index 59b2a68..f0cd7ca 100644 (file)
@@ -24,6 +24,7 @@ import java.io.ObjectOutput;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -39,6 +40,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.CacheQueryExecutedEvent;
 import org.apache.ignite.events.CacheQueryReadEvent;
@@ -69,6 +71,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -91,6 +94,19 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     static final int LSNR_MAX_BUF_SIZE =
         IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_LISTENER_MAX_BUFFER_SIZE", 10_000);
 
+    /**
+     * Transformer implementation for processing received remote events.
+     * They are already transformed so we simply return transformed value for event.
+     */
+    private transient IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> returnValTrans =
+        new IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, Object>() {
+            @Override public Object apply(CacheEntryEvent<? extends K, ? extends V> evt) {
+                assert evt.getKey() == null;
+
+                return evt.getValue();
+            }
+        };
+
     /** Cache name. */
     private String cacheName;
 
@@ -161,7 +177,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     private transient boolean ignoreClsNotFound;
 
     /** */
-    private transient boolean asyncCb;
+    transient boolean asyncCb;
 
     /** */
     private transient UUID nodeId;
@@ -196,14 +212,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     public CacheContinuousQueryHandler(
         String cacheName,
         Object topic,
-        CacheEntryUpdatedListener<K, V> locLsnr,
-        CacheEntryEventSerializableFilter<K, V> rmtFilter,
+        @Nullable CacheEntryUpdatedListener<K, V> locLsnr,
+        @Nullable CacheEntryEventSerializableFilter<K, V> rmtFilter,
         boolean oldValRequired,
         boolean sync,
         boolean ignoreExpired,
         boolean ignoreClsNotFound) {
         assert topic != null;
-        assert locLsnr != null;
 
         this.cacheName = cacheName;
         this.topic = topic;
@@ -505,14 +520,14 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                         if (asyncCb) {
                             ctx.asyncCallbackPool().execute(new Runnable() {
                                 @Override public void run() {
-                                    locLsnr.onUpdated(evts);
+                                    notifyLocalListener(evts, getTransformer());
                                 }
                             }, part);
                         }
                         else
                             skipCtx.addProcessClosure(new Runnable() {
                                 @Override public void run() {
-                                    locLsnr.onUpdated(evts);
+                                    notifyLocalListener(evts, getTransformer());
                                 }
                             });
                     }
@@ -584,6 +599,20 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     }
 
     /**
+     * @return Cache entry event transformer.
+     */
+    @Nullable protected IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> getTransformer() {
+        return null;
+    }
+
+    /**
+     * @return Local listener of transformed events.
+     */
+    @Nullable protected EventListener<?> localTransformedEventListener() {
+        return null;
+    }
+
+    /**
      * @param cctx Context.
      * @param nodeId ID of the node that started routine.
      * @param entry Entry.
@@ -752,8 +781,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             }
         }
 
-        if (!entries0.isEmpty())
-            locLsnr.onUpdated(entries0);
+        notifyLocalListener(entries0, returnValTrans);
     }
 
     /**
@@ -825,26 +853,31 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             if (cctx == null)
                 return;
 
-            final CacheContinuousQueryEntry entry = evt.entry();
+            CacheContinuousQueryEntry entry = evt.entry();
+
+            IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> trans = getTransformer();
 
             if (loc) {
                 if (!locCache) {
                     Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, entry);
 
-                    if (!evts.isEmpty())
-                        locLsnr.onUpdated(evts);
+                    notifyLocalListener(evts, trans);
 
                     if (!internal && !skipPrimaryCheck)
                         sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
                 }
                 else {
                     if (!entry.isFiltered())
-                        locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
+                        notifyLocalListener(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt), trans);
                 }
             }
             else {
-                if (!entry.isFiltered())
+                if (!entry.isFiltered()) {
+                    if (trans != null)
+                        entry = transformToEntry(trans, evt);
+
                     prepareEntry(cctx, nodeId, entry);
+                }
 
                 Object entryOrList = handleEntry(cctx, entry);
 
@@ -889,6 +922,28 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     }
 
     /**
+     * Notifies local listener.
+     *
+     * @param evts Events.
+     * @param trans Transformer
+     */
+    private void notifyLocalListener(Collection<CacheEntryEvent<? extends K, ? extends V>> evts,
+        @Nullable IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> trans) {
+        EventListener locTransLsnr = localTransformedEventListener();
+
+        assert (locLsnr != null && locTransLsnr == null) || (locLsnr == null && locTransLsnr != null);
+
+        if (F.isEmpty(evts))
+            return;
+
+        if (locLsnr != null)
+            locLsnr.onUpdated(evts);
+
+        if (locTransLsnr != null)
+            locTransLsnr.onUpdated(transform(trans, evts));
+    }
+
+    /**
      * @return Task name.
      */
     private String taskName() {
@@ -1258,4 +1313,72 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         }
     }
 
+    /**
+     * @param trans Transformer.
+     * @param evts Source events.
+     * @return Collection of transformed values.
+     */
+    private Iterable transform(final IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> trans,
+        Collection<CacheEntryEvent<? extends K, ? extends V>> evts) {
+        final Iterator<CacheEntryEvent<? extends K, ? extends V>> iter = evts.iterator();
+
+        return new Iterable() {
+            @NotNull @Override public Iterator iterator() {
+                return new Iterator() {
+                    @Override public boolean hasNext() {
+                        return iter.hasNext();
+                    }
+
+                    @Override public Object next() {
+                        return transform(trans, iter.next());
+                    }
+                };
+            }
+        };
+    }
+
+    /**
+     * Transform event data with {@link #getTransformer()} if exists.
+     *
+     * @param trans Transformer.
+     * @param evt Event to transform.
+     * @return Entry contains only transformed data if transformer exists. Unchanged event if transformer is not set.
+     * @see #getTransformer()
+     */
+    private CacheContinuousQueryEntry transformToEntry(IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> trans,
+        CacheContinuousQueryEvent<? extends K, ? extends V> evt) {
+        Object transVal = transform(trans, evt);
+
+        return new CacheContinuousQueryEntry(evt.entry().cacheId(),
+            evt.entry().eventType(),
+            null,
+            transVal == null ? null : cacheContext(ctx).toCacheObject(transVal),
+            null,
+            evt.entry().isKeepBinary(),
+            evt.entry().partition(),
+            evt.entry().updateCounter(),
+            evt.entry().topologyVersion(),
+            evt.entry().flags());
+    }
+
+    /**
+     * @param trans Transformer.
+     * @param evt Event.
+     * @return Transformed value.
+     */
+    private Object transform(IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> trans,
+        CacheEntryEvent<? extends K, ? extends V> evt) {
+        assert trans != null;
+
+        Object transVal = null;
+
+        try {
+            transVal = trans.apply(evt);
+        }
+        catch (Exception e) {
+            U.error(log, e);
+        }
+
+        return transVal;
+    }
 }
index e48d22e..86c1ae1 100644 (file)
@@ -41,7 +41,7 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
     private static final long serialVersionUID = 0L;
 
     /** Remote filter factory. */
-    private Factory<? extends CacheEntryEventFilter> rmtFilterFactory;
+    Factory<? extends CacheEntryEventFilter> rmtFilterFactory;
 
     /** Deployable object for filter factory. */
     private CacheContinuousQueryDeployableObject rmtFilterFactoryDep;
@@ -74,8 +74,8 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
     public CacheContinuousQueryHandlerV2(
         String cacheName,
         Object topic,
-        CacheEntryUpdatedListener<K, V> locLsnr,
-        Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory,
+        @Nullable CacheEntryUpdatedListener<K, V> locLsnr,
+        @Nullable Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory,
         boolean oldValRequired,
         boolean sync,
         boolean ignoreExpired,
@@ -89,9 +89,6 @@ public class CacheContinuousQueryHandlerV2<K, V> extends CacheContinuousQueryHan
             sync,
             ignoreExpired,
             ignoreClsNotFound);
-
-        assert rmtFilterFactory != null;
-
         this.rmtFilterFactory = rmtFilterFactory;
 
         if (types != null) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java
new file mode 100644 (file)
index 0000000..14d5605
--- /dev/null
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.UUID;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.query.ContinuousQueryWithTransformer;
+import org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteAsyncCallback;
+import org.apache.ignite.lang.IgniteClosure;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Continuous query handler V3 version.
+ * Contains {@link Factory} for remote transformer and {@link EventListener}.
+ *
+ * @see ContinuousQueryWithTransformer
+ */
+public class CacheContinuousQueryHandlerV3<K, V> extends CacheContinuousQueryHandlerV2<K, V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Remote transformer. */
+    private Factory<? extends IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?>> rmtTransFactory;
+
+    /** Deployable object for transformer. */
+    private CacheContinuousQueryDeployableObject rmtTransFactoryDep;
+
+    /** Remote transformer. */
+    private transient IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> rmtTrans;
+
+    /** Local listener for transformed events. */
+    private transient EventListener<?> locTransLsnr;
+
+    /**
+     * Empty constructor.
+     */
+    public CacheContinuousQueryHandlerV3() {
+        super();
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param topic Topic.
+     * @param locTransLsnr Local listener of transformed events
+     * @param rmtFilterFactory Remote filter factory.
+     * @param rmtTransFactory Remote transformer factory.
+     * @param oldValRequired OldValRequired flag.
+     * @param sync Sync flag.
+     * @param ignoreExpired IgnoreExpired flag.
+     * @param ignoreClsNotFound IgnoreClassNotFoundException flag.
+     */
+    public CacheContinuousQueryHandlerV3(
+        String cacheName,
+        Object topic,
+        EventListener<?> locTransLsnr,
+        @Nullable Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory,
+        Factory<? extends IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?>> rmtTransFactory,
+        boolean oldValRequired,
+        boolean sync,
+        boolean ignoreExpired,
+        boolean ignoreClsNotFound) {
+        super(
+            cacheName,
+            topic,
+            null,
+            rmtFilterFactory,
+            oldValRequired,
+            sync,
+            ignoreExpired,
+            ignoreClsNotFound,
+            null);
+
+        assert locTransLsnr != null;
+        assert rmtTransFactory != null;
+
+        this.locTransLsnr = locTransLsnr;
+        this.rmtTransFactory = rmtTransFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> getTransformer() {
+        if (rmtTrans == null && rmtTransFactory != null)
+            rmtTrans = rmtTransFactory.create();
+
+        return rmtTrans;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected EventListener<?> localTransformedEventListener() {
+        return locTransLsnr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheEntryEventFilter<K, V> getEventFilter() {
+        if (rmtFilterFactory == null)
+            return null;
+
+        return super.getEventFilter();
+    }
+
+    /** {@inheritDoc} */
+    @Override public RegisterStatus register(UUID nodeId, UUID routineId,
+        GridKernalContext ctx) throws IgniteCheckedException {
+        final IgniteClosure trans = getTransformer();
+
+        if (trans != null)
+            ctx.resource().injectGeneric(trans);
+
+        if (locTransLsnr != null) {
+            ctx.resource().injectGeneric(locTransLsnr);
+
+            asyncCb = U.hasAnnotation(locTransLsnr, IgniteAsyncCallback.class);
+        }
+
+        return super.register(nodeId, routineId, ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException {
+        super.p2pMarshal(ctx);
+
+        if (rmtTransFactory != null && !U.isGrid(rmtTransFactory.getClass()))
+            rmtTransFactoryDep = new CacheContinuousQueryDeployableObject(rmtTransFactory, ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
+        super.p2pUnmarshal(nodeId, ctx);
+
+        if (rmtTransFactoryDep != null)
+            rmtTransFactory = rmtTransFactoryDep.unmarshal(nodeId, ctx);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+
+        boolean b = rmtTransFactoryDep != null;
+
+        out.writeBoolean(b);
+
+        if (b)
+            out.writeObject(rmtTransFactoryDep);
+        else
+            out.writeObject(rmtTransFactory);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        super.readExternal(in);
+
+        boolean b = in.readBoolean();
+
+        if (b)
+            rmtTransFactoryDep = (CacheContinuousQueryDeployableObject)in.readObject();
+        else
+            rmtTransFactory = (Factory<? extends IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?>>)in.readObject();
+    }
+}
index 628111b..1e131ef 100644 (file)
@@ -49,6 +49,7 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.query.CacheQueryEntryEvent;
 import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -57,8 +58,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI2;
@@ -66,6 +67,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteAsyncCallback;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.security.SecurityPermission;
@@ -434,6 +436,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     /**
      * @param locLsnr Local listener.
      * @param rmtFilter Remote filter.
+     * @param rmtFilterFactory Remote filter factory
      * @param bufSize Buffer size.
      * @param timeInterval Time interval.
      * @param autoUnsubscribe Auto unsubscribe flag.
@@ -441,9 +444,11 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      * @return Continuous routine ID.
      * @throws IgniteCheckedException In case of error.
      */
-    public UUID executeQuery(final CacheEntryUpdatedListener locLsnr,
+    public UUID executeQuery(@Nullable final CacheEntryUpdatedListener locLsnr,
+        @Nullable final EventListener locTransLsnr,
         @Nullable final CacheEntryEventSerializableFilter rmtFilter,
         @Nullable final Factory<? extends CacheEntryEventFilter> rmtFilterFactory,
+        @Nullable final Factory<? extends IgniteClosure> rmtTransFactory,
         int bufSize,
         long timeInterval,
         boolean autoUnsubscribe,
@@ -453,12 +458,30 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     {
         IgniteOutClosure<CacheContinuousQueryHandler> clsr;
 
-        if (rmtFilterFactory != null)
+        if (rmtTransFactory != null) {
+            clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() {
+                @Override public CacheContinuousQueryHandler apply() {
+                    assert locTransLsnr != null;
+
+                    return new CacheContinuousQueryHandlerV3(
+                        cctx.name(),
+                        TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
+                        locTransLsnr,
+                        rmtFilterFactory,
+                        rmtTransFactory,
+                        true,
+                        false,
+                        !includeExpired,
+                        false);
+                }
+            };
+        }
+        else if (rmtFilterFactory != null) {
             clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() {
                 @Override public CacheContinuousQueryHandler apply() {
-                    CacheContinuousQueryHandler hnd;
+                    assert locLsnr != null;
 
-                    hnd = new CacheContinuousQueryHandlerV2(
+                    return new CacheContinuousQueryHandlerV2(
                         cctx.name(),
                         TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
                         locLsnr,
@@ -468,13 +491,15 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                         !includeExpired,
                         false,
                         null);
-
-                    return hnd;
                 }
             };
-        else
+        }
+        else {
             clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() {
                 @Override public CacheContinuousQueryHandler apply() {
+                    assert locLsnr != null;
+                    assert locTransLsnr == null;
+
                     return new CacheContinuousQueryHandler(
                         cctx.name(),
                         TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
@@ -486,6 +511,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                         false);
                 }
             };
+        }
 
         return executeQuery0(
             locLsnr,
@@ -676,6 +702,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         }
 
         if (notifyExisting) {
+            assert locLsnr != null : "Local listener can't be null if notification for existing entries are enabled";
+
             final Iterator<CacheDataRow> it = cctx.offheap().cacheIterator(cctx.cacheId(),
                 true,
                 true,
index 142ff35..9a815de 100644 (file)
@@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import javax.cache.Cache;
 import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryListenerException;
@@ -51,8 +52,11 @@ import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.AbstractContinuousQuery;
 import org.apache.ignite.cache.query.CacheQueryEntryEvent;
 import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.ContinuousQueryWithTransformer;
+import org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.cache.store.CacheStoreAdapter;
@@ -62,6 +66,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -464,7 +469,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
         IgniteCache<QueryTestKey, QueryTestValue> cache = grid(getClientIndex()).createCache(ccfg);
 
         try {
-            ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>();
+            AbstractContinuousQuery<QueryTestKey, QueryTestValue> qry = createQuery();
 
             final List<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> evts =
                 new CopyOnWriteArrayList<>();
@@ -472,13 +477,20 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             if (noOpFilterFactory() != null)
                 qry.setRemoteFilterFactory(noOpFilterFactory());
 
-            qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() {
-                @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
-                    ? extends QueryTestValue>> events) throws CacheEntryListenerException {
-                    for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : events)
-                        evts.add(e);
-                }
-            });
+            if (qry instanceof ContinuousQuery) {
+                ((ContinuousQuery<QueryTestKey, QueryTestValue>)qry).setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() {
+                    @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+                        ? extends QueryTestValue>> events) throws CacheEntryListenerException {
+                        for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : events)
+                            evts.add(e);
+                    }
+                });
+            }
+            else if (qry instanceof ContinuousQueryWithTransformer)
+                initQueryWithTransformer(
+                    (ContinuousQueryWithTransformer<QueryTestKey, QueryTestValue, CacheEntryEvent>)qry, evts);
+            else
+                fail("Unknown query type");
 
             QueryTestKey key = new QueryTestKey(1);
 
@@ -595,7 +607,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
         IgniteCache<QueryTestKey, QueryTestValue> cache = grid(getClientIndex()).createCache(ccfg);
 
         try {
-            ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>();
+            AbstractContinuousQuery<QueryTestKey, QueryTestValue> qry = createQuery();
 
             final List<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> evts =
                 new CopyOnWriteArrayList<>();
@@ -603,13 +615,20 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             if (noOpFilterFactory() != null)
                 qry.setRemoteFilterFactory(noOpFilterFactory());
 
-            qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() {
-                @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
-                    ? extends QueryTestValue>> events) throws CacheEntryListenerException {
-                    for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : events)
-                        evts.add(e);
-                }
-            });
+            if (qry instanceof ContinuousQuery) {
+                ((ContinuousQuery<QueryTestKey, QueryTestValue>)qry).setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() {
+                    @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+                        ? extends QueryTestValue>> events) throws CacheEntryListenerException {
+                        for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : events)
+                            evts.add(e);
+                    }
+                });
+            }
+            else if (qry instanceof ContinuousQueryWithTransformer)
+                initQueryWithTransformer(
+                    (ContinuousQueryWithTransformer<QueryTestKey, QueryTestValue, CacheEntryEvent>)qry, evts);
+            else
+                fail("Unknown query type");
 
             Map<QueryTestKey, QueryTestValue> map = new TreeMap<>();
 
@@ -834,16 +853,23 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             Collection<QueryCursor<?>> curs = new ArrayList<>();
 
             if (deploy == CLIENT) {
-                ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+                AbstractContinuousQuery<Object, Object> qry = createQuery();
 
                 final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
 
-                qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
-                    @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
-                        for (CacheEntryEvent<?, ?> evt : evts)
-                            evtsQueue.add(evt);
-                    }
-                });
+                if (qry instanceof ContinuousQuery) {
+                    ((ContinuousQuery<Object, Object>)qry).setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+                        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+                            for (CacheEntryEvent<?, ?> evt : evts)
+                                evtsQueue.add(evt);
+                        }
+                    });
+                }
+                else if (qry instanceof ContinuousQueryWithTransformer)
+                    initQueryWithTransformer(
+                        (ContinuousQueryWithTransformer<Object, Object, CacheEntryEvent>)qry, evtsQueue);
+                else
+                    fail("Unknown query type");
 
                 evtsQueues.add(evtsQueue);
 
@@ -852,16 +878,23 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
                 curs.add(cur);
             }
             else if (deploy == SERVER) {
-                ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+                AbstractContinuousQuery<Object, Object> qry = createQuery();
 
                 final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
 
-                qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
-                    @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
-                        for (CacheEntryEvent<?, ?> evt : evts)
-                            evtsQueue.add(evt);
-                    }
-                });
+                if (qry instanceof ContinuousQuery) {
+                    ((ContinuousQuery<Object, Object>)qry).setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+                        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+                            for (CacheEntryEvent<?, ?> evt : evts)
+                                evtsQueue.add(evt);
+                        }
+                    });
+                }
+                else if (qry instanceof ContinuousQueryWithTransformer)
+                    initQueryWithTransformer(
+                        (ContinuousQueryWithTransformer<Object, Object, CacheEntryEvent>)qry, evtsQueue);
+                else
+                    fail("Unknown query type");
 
                 evtsQueues.add(evtsQueue);
 
@@ -871,16 +904,23 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
             }
             else {
                 for (int i = 0; i <= getServerNodeCount(); i++) {
-                    ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+                    AbstractContinuousQuery<Object, Object> qry = createQuery();
 
                     final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
 
-                    qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
-                        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
-                            for (CacheEntryEvent<?, ?> evt : evts)
-                                evtsQueue.add(evt);
-                        }
-                    });
+                    if (qry instanceof ContinuousQuery) {
+                        ((ContinuousQuery<Object, Object>)qry).setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+                            @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+                                for (CacheEntryEvent<?, ?> evt : evts)
+                                    evtsQueue.add(evt);
+                            }
+                        });
+                    }
+                    else if (qry instanceof ContinuousQueryWithTransformer)
+                        initQueryWithTransformer(
+                            (ContinuousQueryWithTransformer<Object, Object, CacheEntryEvent>)qry, evtsQueue);
+                    else
+                        fail("Unknown query type");
 
                     evtsQueues.add(evtsQueue);
 
@@ -1417,6 +1457,15 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     }
 
     /**
+     * @param <K> Key type.
+     * @param <V> Value type.
+     * @return New instance of continuous query.
+     */
+    protected <K, V> AbstractContinuousQuery<K, V> createQuery() {
+        return new ContinuousQuery<>();
+    }
+
+    /**
      *
      */
     private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> {
@@ -1586,4 +1635,37 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     protected enum ContinuousDeploy {
         CLIENT, SERVER, ALL
     }
+
+    /**
+     * Initialize continuous query with transformer.
+     * Query will accumulate all events in accumulator.
+     *
+     * @param qry Continuous query.
+     * @param acc Accumulator for events.
+     * @param <K> Key type.
+     * @param <V> Value type.
+     */
+    private <K, V> void initQueryWithTransformer(
+        ContinuousQueryWithTransformer<K, V, CacheEntryEvent> qry,
+        Collection<CacheEntryEvent<? extends K, ? extends V>> acc) {
+
+        IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, CacheEntryEvent> transformer =
+            new IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, CacheEntryEvent>() {
+                @Override public CacheEntryEvent apply(CacheEntryEvent<? extends K, ? extends V> event) {
+                    return event;
+                }
+            };
+
+        ContinuousQueryWithTransformer<K, V, CacheEntryEvent> qry0 =
+            (ContinuousQueryWithTransformer<K, V, CacheEntryEvent>)qry;
+
+        qry0.setRemoteTransformerFactory(FactoryBuilder.factoryOf(transformer));
+
+        qry0.setLocalListener(new EventListener<CacheEntryEvent>() {
+            @Override public void onUpdated(Iterable<? extends CacheEntryEvent> events) {
+                for (CacheEntryEvent e : events)
+                    acc.add(e);
+            }
+        });
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerClientSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerClientSelfTest.java
new file mode 100644 (file)
index 0000000..8d8b4c1
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.Ignite;
+
+/**
+ */
+public class CacheContinuousWithTransformerClientSelfTest extends CacheContinuousWithTransformerReplicatedSelfTest {
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        client = true;
+
+        startGrid("client");
+
+        client = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Ignite gridToRunQuery() throws Exception {
+        return grid("client");
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerFailoverTest.java
new file mode 100644 (file)
index 0000000..241dc2a
--- /dev/null
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.concurrent.CountDownLatch;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteClientDisconnectedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.query.ContinuousQueryWithTransformer;
+import org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ */
+public class CacheContinuousWithTransformerFailoverTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        cfg.setCacheConfiguration(ccfg);
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testServerNodeLeft() throws Exception {
+        startGrids(3);
+
+        client = true;
+
+        final int CLIENT_ID = 3;
+
+        Ignite clnNode = startGrid(CLIENT_ID);
+
+        client = false;
+
+        IgniteOutClosure<IgniteCache<Integer, Integer>> cache =
+            new IgniteOutClosure<IgniteCache<Integer, Integer>>() {
+                int cnt = 0;
+
+                @Override public IgniteCache<Integer, Integer> apply() {
+                    ++cnt;
+
+                    return grid(CLIENT_ID).cache(DEFAULT_CACHE_NAME);
+                }
+            };
+
+        final CacheEventListener lsnr = new CacheEventListener();
+
+        ContinuousQueryWithTransformer<Object, Object, String> qry = new ContinuousQueryWithTransformer<>();
+
+        qry.setLocalListener(lsnr);
+        qry.setRemoteTransformerFactory(FactoryBuilder.factoryOf(new IgniteClosure<CacheEntryEvent<?, ?>, String>() {
+            @Override public String apply(CacheEntryEvent<?, ?> evt) {
+                return "" + evt.getKey() + evt.getValue();
+            }
+        }));
+
+        QueryCursor<?> cur = clnNode.cache(DEFAULT_CACHE_NAME).query(qry);
+
+        boolean first = true;
+
+        int keyCnt = 1;
+
+        for (int i = 0; i < 10; i++) {
+            log.info("Start iteration: " + i);
+
+            if (first)
+                first = false;
+            else {
+                for (int srv = 0; srv < CLIENT_ID - 1; srv++)
+                    startGrid(srv);
+            }
+
+            lsnr.latch = new CountDownLatch(keyCnt);
+
+            for (int key = 0; key < keyCnt; key++)
+                cache.apply().put(key, key);
+
+            assertTrue("Failed to wait for event. Left events: " + lsnr.latch.getCount(),
+                lsnr.latch.await(10, SECONDS));
+
+            for (int srv = 0; srv < CLIENT_ID - 1; srv++)
+                stopGrid(srv);
+        }
+
+        tryClose(cur);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTransformerException() throws Exception {
+        try {
+            startGrids(1);
+
+            Ignite ignite = ignite(0);
+
+            IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+            final CountDownLatch latch = new CountDownLatch(10);
+
+            ContinuousQueryWithTransformer<Integer, Integer, Integer> qry = new ContinuousQueryWithTransformer<>();
+
+            qry.setLocalListener(new EventListener<Integer>() {
+                /** */
+                @LoggerResource
+                private IgniteLogger log;
+
+                @Override public void onUpdated(Iterable<? extends Integer> evts) throws CacheEntryListenerException {
+                    for (Integer evt : evts) {
+                        log.debug("" + evt);
+                    }
+                }
+            });
+
+            qry.setRemoteTransformerFactory(FactoryBuilder.factoryOf(new IgniteClosure<CacheEntryEvent<? extends Integer, ? extends Integer>, Integer>() {
+                @Override public Integer apply(CacheEntryEvent<? extends Integer, ? extends Integer> evt) {
+                    latch.countDown();
+
+                    throw new RuntimeException("Test error.");
+                }
+            }));
+
+            qry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheEntryEventSerializableFilter<Integer, Integer>() {
+                @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt) {
+                    return true;
+                }
+            }));
+
+            try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
+                for (int i = 0; i < 10; i++)
+                    cache.put(i, i);
+
+                assertTrue(latch.await(10, SECONDS));
+            }
+        } finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Ensure that every node see every update.
+     *
+     * @throws Exception If failed.
+     */
+    public void testCrossCallback() throws Exception {
+        startGrids(2);
+        try {
+            IgniteCache<Integer, Integer> cache1 = grid(0).cache(DEFAULT_CACHE_NAME);
+            IgniteCache<Integer, Integer> cache2 = grid(1).cache(DEFAULT_CACHE_NAME);
+
+            final int key1 = primaryKey(cache1);
+            final int key2 = primaryKey(cache2);
+
+            final CountDownLatch latch1 = new CountDownLatch(2);
+            final CountDownLatch latch2 = new CountDownLatch(2);
+
+            Factory<? extends IgniteClosure<CacheEntryEvent<? extends Integer, ? extends Integer>, Integer>> factory =
+                FactoryBuilder.factoryOf(
+                    new IgniteClosure<CacheEntryEvent<? extends Integer, ? extends Integer>, Integer>() {
+                        @Override public Integer apply(CacheEntryEvent<? extends Integer, ? extends Integer> evt) {
+                            return evt.getKey();
+                        }
+                    });
+
+            ContinuousQueryWithTransformer<Integer, Integer, Integer> qry1 = new ContinuousQueryWithTransformer<>();
+
+            qry1.setRemoteTransformerFactory(factory);
+
+            qry1.setLocalListener(new EventListener<Integer>() {
+                @Override public void onUpdated(Iterable<? extends Integer> evts) {
+                    for (int evt : evts) {
+                        log.info("Update in cache 1: " + evt);
+
+                        if (evt == key1 || evt == key2)
+                            latch1.countDown();
+                    }
+                }
+            });
+
+            ContinuousQueryWithTransformer<Integer, Integer, Integer> qry2 = new ContinuousQueryWithTransformer<>();
+
+            qry2.setRemoteTransformerFactory(factory);
+
+            qry2.setLocalListener(new EventListener<Integer>() {
+                @Override public void onUpdated(Iterable<? extends Integer> evts) {
+                    for (int evt : evts) {
+                        log.info("Update in cache 2: " + evt);
+
+                        if (evt == key1 || evt == key2)
+                            latch2.countDown();
+                    }
+                }
+            });
+
+            try (QueryCursor<Cache.Entry<Integer, Integer>> ignored1 = cache2.query(qry1);
+                 QueryCursor<Cache.Entry<Integer, Integer>> ignored2 = cache2.query(qry2)) {
+                cache1.put(key1, key1);
+                cache1.put(key2, key2);
+
+                assertTrue(latch1.await(10, SECONDS));
+                assertTrue(latch2.await(10, SECONDS));
+            }
+        } finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @param cur Cur.
+     */
+    private void tryClose(QueryCursor<?> cur) {
+        try {
+            cur.close();
+        }
+        catch (Throwable e) {
+            if (e instanceof IgniteClientDisconnectedException) {
+                IgniteClientDisconnectedException ex = (IgniteClientDisconnectedException)e;
+
+                ex.reconnectFuture().get();
+
+                cur.close();
+            }
+            else
+                throw e;
+        }
+    }
+
+    /**
+     */
+    private static class CacheEventListener implements EventListener<String> {
+        /** */
+        public volatile CountDownLatch latch = new CountDownLatch(1);
+
+        /** */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<? extends String> evts) {
+            for (Object evt : evts) {
+                log.info("Received cache event: " + evt);
+
+                latch.countDown();
+            }
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerLocalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerLocalSelfTest.java
new file mode 100644 (file)
index 0000000..f01764f
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ */
+public class CacheContinuousWithTransformerLocalSelfTest extends CacheContinuousWithTransformerReplicatedSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.LOCAL;
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerPartitionedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerPartitionedSelfTest.java
new file mode 100644 (file)
index 0000000..a76dc98
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ */
+public class CacheContinuousWithTransformerPartitionedSelfTest extends CacheContinuousWithTransformerReplicatedSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerRandomOperationsTest.java
new file mode 100644 (file)
index 0000000..fe5ab20
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.query.AbstractContinuousQuery;
+import org.apache.ignite.cache.query.ContinuousQueryWithTransformer;
+
+/**
+ * Test to check random continuous query operation for ContinuousQueryWithTransformer
+ */
+public class CacheContinuousWithTransformerRandomOperationsTest extends CacheContinuousQueryRandomOperationsTest {
+    /** {@inheritDoc} */
+    @Override protected <K, V> AbstractContinuousQuery<K, V> createQuery() {
+        return new ContinuousQueryWithTransformer<>();
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerReplicatedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerReplicatedSelfTest.java
new file mode 100644 (file)
index 0000000..6035b93
--- /dev/null
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.EventType;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.ContinuousQueryWithTransformer;
+import org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.lang.IgniteAsyncCallback;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+/**
+ */
+public class CacheContinuousWithTransformerReplicatedSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final int DFLT_ENTRY_CNT = 10;
+
+    /** */
+    private static final int DFLT_LATCH_TIMEOUT = 30_000;
+
+    /** */
+    private static final int DFLT_SERVER_NODE_CNT = 1;
+
+    /** */
+    private static final String SARAH_CONNOR = "Sarah Connor";
+
+    /** */
+    private static final String JOHN_CONNOR = "John Connor";
+
+    /** */
+    private static final boolean ADD_EVT_FILTER = true;
+
+    /** */
+    private static final boolean SKIP_EVT_FILTER = false;
+
+    /** */
+    private static final boolean KEEP_BINARY = true;
+
+    /** */
+    private static final boolean SKIP_KEEP_BINARY = false;
+
+    /** */
+    private static final long LATCH_TIMEOUT = 10_000L;
+
+    /** */
+    protected boolean client = false;
+
+    /** */
+    protected CacheMode cacheMode() {
+        return CacheMode.REPLICATED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        if (client)
+            cfg.setClientMode(true);
+        else {
+            CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+
+            ccfg.setCacheMode(cacheMode());
+
+            cfg.setCacheConfiguration(ccfg);
+        }
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        gridToRunQuery().cache(DEFAULT_CACHE_NAME).removeAll();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrids(DFLT_SERVER_NODE_CNT);
+
+        super.beforeTestsStarted();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids(true);
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @return Grid to run query on.
+     * @throws Exception If failed.
+     */
+    protected Ignite gridToRunQuery() throws Exception {
+        return grid(0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testContinuousWithTransformer() throws Exception {
+        runContinuousQueryWithTransformer(SKIP_EVT_FILTER, DFLT_ENTRY_CNT, SKIP_KEEP_BINARY, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testContinuousWithTransformerAsync() throws Exception {
+        runContinuousQueryWithTransformer(SKIP_EVT_FILTER, DFLT_ENTRY_CNT, SKIP_KEEP_BINARY, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testContinuousWithTransformerAndRegularListener() throws Exception {
+        runContinuousQueryWithTransformer(SKIP_EVT_FILTER, DFLT_ENTRY_CNT, SKIP_KEEP_BINARY, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testContinuousWithTransformerAndRegularListenerAsync() throws Exception {
+        runContinuousQueryWithTransformer(SKIP_EVT_FILTER, DFLT_ENTRY_CNT, SKIP_KEEP_BINARY, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testContinuousWithTransformerWithFilter() throws Exception {
+        runContinuousQueryWithTransformer(ADD_EVT_FILTER, DFLT_ENTRY_CNT / 2, SKIP_KEEP_BINARY, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testContinuousWithTransformerWithFilterAsync() throws Exception {
+        runContinuousQueryWithTransformer(ADD_EVT_FILTER, DFLT_ENTRY_CNT / 2, SKIP_KEEP_BINARY, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testContinuousWithTransformerAndRegularListenerWithFilter() throws Exception {
+        runContinuousQueryWithTransformer(ADD_EVT_FILTER, DFLT_ENTRY_CNT / 2, KEEP_BINARY, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testContinuousWithTransformerAndRegularListenerWithFilterAsync() throws Exception {
+        runContinuousQueryWithTransformer(ADD_EVT_FILTER, DFLT_ENTRY_CNT / 2, KEEP_BINARY, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testContinuousWithTransformerKeepBinary() throws Exception {
+        runContinuousQueryWithTransformer(SKIP_EVT_FILTER, DFLT_ENTRY_CNT, KEEP_BINARY, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testContinuousWithTransformerKeepBinaryAsync() throws Exception {
+        runContinuousQueryWithTransformer(SKIP_EVT_FILTER, DFLT_ENTRY_CNT, KEEP_BINARY, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testContinuousWithTransformerAndRegularListenerKeepBinary() throws Exception {
+        runContinuousQueryWithTransformer(SKIP_EVT_FILTER, DFLT_ENTRY_CNT, KEEP_BINARY, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testContinuousWithTransformerAndRegularListenerKeepBinaryAsync() throws Exception {
+        runContinuousQueryWithTransformer(SKIP_EVT_FILTER, DFLT_ENTRY_CNT, KEEP_BINARY, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testContinuousWithTransformerWithFilterKeepBinary() throws Exception {
+        runContinuousQueryWithTransformer(ADD_EVT_FILTER, DFLT_ENTRY_CNT / 2, KEEP_BINARY, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testContinuousWithTransformerWithFilterKeepBinaryAsync() throws Exception {
+        runContinuousQueryWithTransformer(ADD_EVT_FILTER, DFLT_ENTRY_CNT / 2, KEEP_BINARY, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testContinuousWithTransformerAndRegularListenerWithFilterKeepBinary() throws Exception {
+        runContinuousQueryWithTransformer(ADD_EVT_FILTER, DFLT_ENTRY_CNT / 2, KEEP_BINARY, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testContinuousWithTransformerAndRegularListenerWithFilterKeepBinaryAsync() throws Exception {
+        runContinuousQueryWithTransformer(ADD_EVT_FILTER, DFLT_ENTRY_CNT / 2, KEEP_BINARY, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTransformerReturnNull() throws Exception {
+        Ignite ignite = gridToRunQuery();
+
+        IgniteCache<Integer, Employee> cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+        ContinuousQueryWithTransformer<Integer, Employee, String> qry = new ContinuousQueryWithTransformer<>();
+
+        final AtomicInteger cnt = new AtomicInteger(0);
+
+        qry.setLocalListener(new EventListener() {
+            @Override public void onUpdated(Iterable events) throws CacheEntryListenerException {
+                for (Object e : events) {
+                    assertNull(e);
+
+                    cnt.incrementAndGet();
+                }
+            }
+        });
+
+        qry.setRemoteTransformerFactory(FactoryBuilder.factoryOf(
+            new IgniteClosure<CacheEntryEvent<? extends Integer, ? extends Employee>, String>() {
+                @Override public String apply(CacheEntryEvent<? extends Integer, ? extends Employee> evt) {
+                    return null;
+                }
+        }));
+
+        qry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheEntryEventSerializableFilter<Integer, Employee>() {
+            @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Employee> evt) {
+                return true;
+            }
+        }));
+
+        try (QueryCursor<Cache.Entry<Integer, Employee>> ignored = cache.query(qry)) {
+            for (int i = 0; i < 10; i++)
+                cache.put(i, new Employee(JOHN_CONNOR, i));
+
+            boolean evtsReceived = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return cnt.get() == 10;
+                }
+            }, 20_000);
+
+            assertTrue(evtsReceived);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testExpired() throws Exception {
+        Ignite ignite = gridToRunQuery();
+
+        IgniteCache<Integer, Employee> cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+        cache = cache.withExpiryPolicy(new CreatedExpiryPolicy(new Duration(MILLISECONDS, 100)));
+
+        final Set<Integer> keys = new GridConcurrentHashSet<>();
+        final CountDownLatch latch = new CountDownLatch(2);
+
+        ContinuousQueryWithTransformer<Integer, Employee, Integer> qry = new ContinuousQueryWithTransformer<>();
+
+        qry.setIncludeExpired(true);
+
+        qry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheEntryEventSerializableFilter<Integer, Employee>() {
+            @Override public boolean evaluate(CacheEntryEvent event) throws CacheEntryListenerException {
+                return event.getEventType() == EventType.EXPIRED;
+            }
+        }));
+
+        qry.setRemoteTransformerFactory(FactoryBuilder.factoryOf(
+            new IgniteClosure<CacheEntryEvent<? extends Integer, ? extends Employee>, Integer>() {
+                @Override public Integer apply(CacheEntryEvent<? extends Integer, ? extends Employee> evt) {
+                    assertNull(evt.getValue());
+
+                    assertNotNull(evt.getOldValue());
+
+                    return evt.getKey();
+                }
+            }));
+
+        qry.setLocalListener(new EventListener<Integer>() {
+            @Override public void onUpdated(Iterable<? extends Integer> evts) {
+                for (Integer key : evts) {
+                    keys.add(key);
+
+                    latch.countDown();
+                }
+            }
+        });
+
+        try (QueryCursor<Cache.Entry<Integer, Employee>> ignored = cache.query(qry)) {
+            cache.put(1, new Employee(SARAH_CONNOR, 42));
+            cache.put(2, new Employee(JOHN_CONNOR, 42));
+
+            // Wait for expiration.
+            latch.await(LATCH_TIMEOUT, MILLISECONDS);
+
+            assertEquals(2, keys.size());
+
+            assertTrue(keys.contains(1));
+            assertTrue(keys.contains(2));
+        }
+    }
+
+    /**
+     * @param addEvtFilter Add event filter to ContinuousQueryWithTransformer flag.
+     * @param expTransCnt Expected transformed event count.
+     * @param keepBinary Keep binary flag.
+     * @param async Flag to use transformed event listener with {@link IgniteAsyncCallback}.
+     * @throws Exception If failed.
+     */
+    private void runContinuousQueryWithTransformer(boolean addEvtFilter, int expTransCnt, boolean keepBinary,
+        boolean async)
+        throws Exception {
+        Ignite ignite = gridToRunQuery();
+
+        IgniteCache<Integer, Employee> cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+        if (keepBinary)
+            cache = cache.withKeepBinary();
+
+        populateData(cache, JOHN_CONNOR);
+
+        CountDownLatch transUpdCntLatch = new CountDownLatch(expTransCnt);
+
+        AtomicInteger transCnt = new AtomicInteger(0);
+
+        EventListener<String> transLsnr = async ?
+            new LocalEventListenerAsync(transCnt, transUpdCntLatch) :
+            new LocalEventListener(transCnt, transUpdCntLatch);
+
+        Factory<? extends CacheEntryEventFilter> rmtFilterFactory = null;
+
+        if (addEvtFilter)
+            rmtFilterFactory = FactoryBuilder.factoryOf(new RemoteCacheEntryEventFilter());
+
+        Factory<? extends IgniteClosure> factory = FactoryBuilder.factoryOf(new RemoteTransformer(keepBinary));
+
+        ContinuousQueryWithTransformer<Integer, Employee, String> qry = new ContinuousQueryWithTransformer<>();
+
+        qry.setInitialQuery(new ScanQuery<Integer, Employee>());
+        qry.setRemoteFilterFactory((Factory<? extends CacheEntryEventFilter<Integer, Employee>>)rmtFilterFactory);
+        qry.setRemoteTransformerFactory(
+            (Factory<? extends IgniteClosure<CacheEntryEvent<? extends Integer, ? extends Employee>, String>>)factory);
+        qry.setLocalListener(transLsnr);
+
+        try (QueryCursor<Cache.Entry<Integer, Employee>> cur = cache.query(qry)) {
+            for (Cache.Entry<Integer, Employee> e : cur) {
+                assertNotNull(e);
+
+                if (keepBinary) {
+                    assertTrue(((BinaryObject)e.getValue())
+                        .field("name").toString().startsWith(JOHN_CONNOR));
+                }
+                else {
+                    assertTrue(e.getValue().name.startsWith(JOHN_CONNOR));
+                }
+            }
+
+            populateData(cache, SARAH_CONNOR);
+
+            assertTrue("Receive all expected events",
+                transUpdCntLatch.await(DFLT_LATCH_TIMEOUT, MILLISECONDS));
+            assertEquals("Count of updated records equal to expected", expTransCnt, transCnt.get());
+
+        }
+    }
+
+    /**
+     * Put some data to cache.
+     *
+     * @param cache Cache to put data to.
+     * @param name Base name of Employee.
+     */
+    private void populateData(IgniteCache<Integer, Employee> cache, String name) {
+        for (int i = 0; i < DFLT_ENTRY_CNT; i++)
+            cache.put(i, new Employee(name + i, 42 * i));
+    }
+
+    /**
+     */
+    @IgniteAsyncCallback
+    private static class LocalEventListenerAsync extends LocalEventListener {
+        LocalEventListenerAsync(AtomicInteger transCnt, CountDownLatch transUpdCnt) {
+            super(transCnt, transUpdCnt);
+        }
+    }
+
+    /**
+     */
+    private static class RemoteTransformer implements IgniteClosure<CacheEntryEvent<?, ?>, String> {
+        /** */
+        private boolean keepBinary;
+
+        /** */
+        RemoteTransformer(boolean keepBinary) {
+            this.keepBinary = keepBinary;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String apply(CacheEntryEvent<?, ?> evt) {
+            if (keepBinary)
+                return ((BinaryObject)evt.getValue()).field("name");
+
+            return ((Employee)evt.getValue()).name;
+        }
+    }
+
+    /**
+     */
+    private static class RemoteCacheEntryEventFilter implements CacheEntryEventSerializableFilter<Integer, Object> {
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(
+            CacheEntryEvent<? extends Integer, ?> event) throws CacheEntryListenerException {
+            return event.getKey() % 2 == 0;
+        }
+    }
+
+    /**
+     */
+    private static class LocalEventListener implements EventListener<String> {
+        /** */
+        private final AtomicInteger cnt;
+
+        /** */
+        private final CountDownLatch cntLatch;
+
+        /** */
+        LocalEventListener(AtomicInteger transCnt, CountDownLatch transUpdCnt) {
+            this.cnt = transCnt;
+            this.cntLatch = transUpdCnt;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<? extends String> events) throws CacheEntryListenerException {
+            for (String evt : events) {
+                if (evt.startsWith(SARAH_CONNOR))
+                    cntLatch.countDown();
+
+                cnt.incrementAndGet();
+            }
+        }
+    }
+
+    /**
+     */
+    public class Employee {
+        /** */
+        public String name;
+
+        /** */
+        public Integer salary;
+
+        /** */
+        Employee(String name, Integer salary) {
+            this.name = name;
+            this.salary = salary;
+        }
+    }
+}
index 4ea8bca..486626a 100644 (file)
@@ -32,6 +32,12 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinu
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOrderingEventTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTwoNodesTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousWithTransformerFailoverTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousWithTransformerClientSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousWithTransformerLocalSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousWithTransformerPartitionedSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousWithTransformerRandomOperationsTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousWithTransformerReplicatedSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationNearEnabledTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationStoreEnabledTest;
@@ -124,6 +130,13 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite {
         suite.addTestSuite(CacheContinuousQueryConcurrentPartitionUpdateTest.class);
         suite.addTestSuite(CacheContinuousQueryEventBufferTest.class);
 
+        suite.addTestSuite(CacheContinuousWithTransformerReplicatedSelfTest.class);
+        suite.addTestSuite(CacheContinuousWithTransformerLocalSelfTest.class);
+        suite.addTestSuite(CacheContinuousWithTransformerPartitionedSelfTest.class);
+        suite.addTestSuite(CacheContinuousWithTransformerClientSelfTest.class);
+        suite.addTestSuite(CacheContinuousWithTransformerFailoverTest.class);
+        suite.addTestSuite(CacheContinuousWithTransformerRandomOperationsTest.class);
+
         return suite;
     }
 }