IGNITE-11313 Fix cluster hangs on cache invoke with binary objects creation - Fixes... master
authoribessonov <bessonov.ip@gmail.com>
Tue, 19 Feb 2019 18:50:54 +0000 (21:50 +0300)
committerDmitriy Govorukhin <dmitriy.govorukhin@gmail.com>
Tue, 19 Feb 2019 18:50:54 +0000 (21:50 +0300)
Signed-off-by: Dmitriy Govorukhin <dmitriy.govorukhin@gmail.com>
20 files changed:
modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
modules/core/src/main/java/org/apache/ignite/internal/UnregisteredBinaryTypeException.java
modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryClassDescriptor.java
modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldAccessor.java
modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java
modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshallerUtils.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
modules/core/src/main/java/org/apache/ignite/internal/processors/platform/binary/PlatformBinaryProcessor.java
modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/binary/ClientBinaryTypeNamePutRequest.java
modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerContext.java
modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java
modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
modules/core/src/test/java/org/apache/ignite/internal/binary/GridBinaryMarshallerCtxDisabledSelfTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/BinaryMetadataRegistrationInsideEntryProcessorTest.java
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheEntryMemorySizeSelfTest.java
modules/core/src/test/java/org/apache/ignite/marshaller/MarshallerContextTestImpl.java

index a9ac22d..bfe2da0 100644 (file)
@@ -258,9 +258,10 @@ public class MarshallerContextImpl implements MarshallerContext {
 
     /** {@inheritDoc} */
     @Override public boolean registerClassName(
-            byte platformId,
-            int typeId,
-            String clsName
+        byte platformId,
+        int typeId,
+        String clsName,
+        boolean failIfUnregistered
     ) throws IgniteCheckedException {
         ConcurrentMap<Integer, MappedName> cache = getCacheFor(platformId);
 
@@ -276,7 +277,13 @@ public class MarshallerContextImpl implements MarshallerContext {
                 if (transport.stopping())
                     return false;
 
-                IgniteInternalFuture<MappingExchangeResult> fut = transport.awaitMappingAcceptance(new MarshallerMappingItem(platformId, typeId, clsName), cache);
+                MarshallerMappingItem item = new MarshallerMappingItem(platformId, typeId, clsName);
+
+                GridFutureAdapter<MappingExchangeResult> fut = transport.awaitMappingAcceptance(item, cache);
+
+                if (failIfUnregistered && !fut.isDone())
+                    throw new UnregisteredBinaryTypeException(typeId, fut);
+
                 MappingExchangeResult res = fut.get();
 
                 return convertXchRes(res);
@@ -286,7 +293,13 @@ public class MarshallerContextImpl implements MarshallerContext {
             if (transport.stopping())
                 return false;
 
-            IgniteInternalFuture<MappingExchangeResult> fut = transport.proposeMapping(new MarshallerMappingItem(platformId, typeId, clsName), cache);
+            MarshallerMappingItem item = new MarshallerMappingItem(platformId, typeId, clsName);
+
+            GridFutureAdapter<MappingExchangeResult> fut = transport.proposeMapping(item, cache);
+
+            if (failIfUnregistered && !fut.isDone())
+                throw new UnregisteredBinaryTypeException(typeId, fut);
+
             MappingExchangeResult res = fut.get();
 
             return convertXchRes(res);
@@ -294,6 +307,12 @@ public class MarshallerContextImpl implements MarshallerContext {
     }
 
     /** {@inheritDoc} */
+    @Override
+    public boolean registerClassName(byte platformId, int typeId, String clsName) {
+        throw new UnsupportedOperationException("registerClassName");
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean registerClassNameLocally(byte platformId, int typeId, String clsName)
         throws IgniteCheckedException
     {
index f46de12..de507be 100644 (file)
@@ -19,65 +19,65 @@ package org.apache.ignite.internal;
 
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.binary.BinaryMetadata;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 
 /**
  * Exception thrown during serialization if binary metadata isn't registered and it's registration isn't allowed.
+ * Used for both binary types and marshalling mappings.
+ * Confusing old class name is preserved for backwards compatibility.
  */
 public class UnregisteredBinaryTypeException extends IgniteException {
     /** */
     private static final long serialVersionUID = 0L;
 
     /** */
+    private static final String MESSAGE =
+        "Attempted to update binary metadata inside a critical synchronization block (will be " +
+        "automatically retried). This exception must not be wrapped to any other exception class. " +
+        "If you encounter this exception outside of EntryProcessor, please report to Apache Ignite " +
+        "dev-list. Debug info [typeId=%d, binaryMetadata=%s, fut=%s]";
+
+    /** */
+    private static String createMessage(int typeId, BinaryMetadata binaryMetadata, GridFutureAdapter<?> fut) {
+        return String.format(MESSAGE, typeId, binaryMetadata, fut);
+    }
+
+    /** */
     private final int typeId;
 
     /** */
     private final BinaryMetadata binaryMetadata;
 
+    /** */
+    private final GridFutureAdapter<?> fut;
+
     /**
      * @param typeId Type ID.
      * @param binaryMetadata Binary metadata.
      */
     public UnregisteredBinaryTypeException(int typeId, BinaryMetadata binaryMetadata) {
-        this.typeId = typeId;
-        this.binaryMetadata = binaryMetadata;
+        this(typeId, binaryMetadata, null);
     }
 
     /**
-     * @param msg Error message.
      * @param typeId Type ID.
-     * @param binaryMetadata Binary metadata.
+     * @param fut Future to wait in handler.
      */
-    public UnregisteredBinaryTypeException(String msg, int typeId,
-        BinaryMetadata binaryMetadata) {
-        super(msg);
-        this.typeId = typeId;
-        this.binaryMetadata = binaryMetadata;
+    public UnregisteredBinaryTypeException(int typeId, GridFutureAdapter<?> fut) {
+        this(typeId, null, fut);
     }
 
     /**
-     * @param cause Non-null throwable cause.
      * @param typeId Type ID.
      * @param binaryMetadata Binary metadata.
+     * @param fut Future to wait in handler.
      */
-    public UnregisteredBinaryTypeException(Throwable cause, int typeId,
-        BinaryMetadata binaryMetadata) {
-        super(cause);
-        this.typeId = typeId;
-        this.binaryMetadata = binaryMetadata;
-    }
+    private UnregisteredBinaryTypeException(int typeId, BinaryMetadata binaryMetadata, GridFutureAdapter<?> fut) {
+        super(createMessage(typeId, binaryMetadata, fut));
 
-    /**
-     * @param msg Error message.
-     * @param cause Non-null throwable cause.
-     * @param typeId Type ID.
-     * @param binaryMetadata Binary metadata.
-     */
-    public UnregisteredBinaryTypeException(String msg, @Nullable Throwable cause, int typeId,
-        BinaryMetadata binaryMetadata) {
-        super(msg, cause);
         this.typeId = typeId;
         this.binaryMetadata = binaryMetadata;
+        this.fut = fut;
     }
 
     /**
@@ -93,4 +93,11 @@ public class UnregisteredBinaryTypeException extends IgniteException {
     public BinaryMetadata binaryMetadata() {
         return binaryMetadata;
     }
+
+    /**
+     * @return Future to wait in handler.
+     */
+    public GridFutureAdapter<?> future() {
+        return fut;
+    }
 }
index 73dee2b..b4fd932 100644 (file)
@@ -40,8 +40,8 @@ import org.apache.ignite.binary.BinaryObjectException;
 import org.apache.ignite.binary.BinaryReflectiveSerializer;
 import org.apache.ignite.binary.BinarySerializer;
 import org.apache.ignite.binary.Binarylizable;
-import org.apache.ignite.internal.UnregisteredClassException;
 import org.apache.ignite.internal.UnregisteredBinaryTypeException;
+import org.apache.ignite.internal.UnregisteredClassException;
 import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller;
 import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
 import org.apache.ignite.internal.processors.query.QueryUtils;
@@ -823,10 +823,10 @@ public class BinaryClassDescriptor {
                     assert false : "Invalid mode: " + mode;
             }
         }
+        catch (UnregisteredBinaryTypeException | UnregisteredClassException e) {
+            throw e;
+        }
         catch (Exception e) {
-            if (e instanceof UnregisteredBinaryTypeException || e instanceof UnregisteredClassException)
-                throw e;
-
             String msg;
 
             if (S.INCLUDE_SENSITIVE && !F.isEmpty(typeName))
index 7885d95..c263def 100644 (file)
@@ -48,7 +48,9 @@ import java.util.jar.JarFile;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.UnregisteredBinaryTypeException;
 import org.apache.ignite.internal.UnregisteredClassException;
+import org.apache.ignite.internal.processors.marshaller.MappingExchangeResult;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.binary.BinaryBasicIdMapper;
 import org.apache.ignite.binary.BinaryBasicNameMapper;
@@ -781,7 +783,7 @@ public class BinaryContext {
 
         final int typeId = mapper.typeId(clsName);
 
-        registered = registerUserClassName(typeId, cls.getName());
+        registered = registerUserClassName(typeId, cls.getName(), false);
 
         BinarySerializer serializer = serializerForClass(cls);
 
@@ -819,7 +821,7 @@ public class BinaryContext {
     private BinaryClassDescriptor registerUserClassDescriptor(BinaryClassDescriptor desc) {
         boolean registered;
 
-        registered = registerUserClassName(desc.typeId(), desc.describedClass().getName());
+        registered = registerUserClassName(desc.typeId(), desc.describedClass().getName(), false);
 
         if (registered) {
             BinarySerializer serializer = desc.initialSerializer();
@@ -1191,15 +1193,17 @@ public class BinaryContext {
      *
      * @param typeId Type ID.
      * @param clsName Class Name.
+     * @param failIfUnregistered If {@code true} then throw {@link UnregisteredBinaryTypeException} with
+     *      {@link MappingExchangeResult} future instead of synchronously awaiting for its completion.
      * @return {@code True} if the mapping was registered successfully.
      */
-    public boolean registerUserClassName(int typeId, String clsName) {
+    public boolean registerUserClassName(int typeId, String clsName, boolean failIfUnregistered) {
         IgniteCheckedException e = null;
 
         boolean res = false;
 
         try {
-            res = marshCtx.registerClassName(JAVA_ID, typeId, clsName);
+            res = marshCtx.registerClassName(JAVA_ID, typeId, clsName, failIfUnregistered);
         }
         catch (DuplicateTypeIdException dupEx) {
             // Ignore if trying to register mapped type name of the already registered class name and vise versa
index 29f6ef0..bd5ded5 100644 (file)
@@ -156,10 +156,10 @@ public abstract class BinaryFieldAccessor {
         try {
             write0(obj, writer);
         }
+        catch (UnregisteredClassException | UnregisteredBinaryTypeException ex) {
+            throw ex;
+        }
         catch (Exception ex) {
-            if (ex instanceof UnregisteredClassException || ex instanceof UnregisteredBinaryTypeException)
-                throw ex;
-
             if (S.INCLUDE_SENSITIVE && !F.isEmpty(name))
                 throw new BinaryObjectException("Failed to write field [name=" + name + ']', ex);
             else
index a21e74b..f2664c2 100644 (file)
@@ -361,7 +361,7 @@ public class BinaryObjectBuilderImpl implements BinaryObjectBuilder {
                 if (affFieldName0 == null)
                     affFieldName0 = ctx.affinityKeyFieldName(typeId);
 
-                ctx.registerUserClassName(typeId, typeName);
+                ctx.registerUserClassName(typeId, typeName, writer.failIfUnregistered());
 
                 ctx.updateMetadata(typeId, new BinaryMetadata(typeId, typeName, fieldsMeta, affFieldName0,
                     Collections.singleton(curSchema), false, null), writer.failIfUnregistered());
index 5040816..898e447 100644 (file)
@@ -327,8 +327,12 @@ public class TcpIgniteClient implements IgniteClient {
         private Map<Integer, String> cache = new ConcurrentHashMap<>();
 
         /** {@inheritDoc} */
-        @Override public boolean registerClassName(byte platformId, int typeId, String clsName)
-            throws IgniteCheckedException {
+        @Override public boolean registerClassName(
+            byte platformId,
+            int typeId,
+            String clsName,
+            boolean failIfUnregistered
+        ) throws IgniteCheckedException {
 
             if (platformId != MarshallerPlatformIds.JAVA_ID)
                 throw new IllegalArgumentException("platformId");
@@ -359,6 +363,13 @@ public class TcpIgniteClient implements IgniteClient {
         }
 
         /** {@inheritDoc} */
+        @Override
+        @Deprecated
+        public boolean registerClassName(byte platformId, int typeId, String clsName) throws IgniteCheckedException {
+            return registerClassName(platformId, typeId, clsName, false);
+        }
+
+        /** {@inheritDoc} */
         @Override public boolean registerClassNameLocally(byte platformId, int typeId, String clsName) {
             if (platformId != MarshallerPlatformIds.JAVA_ID)
                 throw new IllegalArgumentException("platformId");
index 4626df7..3ede240 100644 (file)
@@ -201,7 +201,7 @@ class OptimizedMarshallerUtils {
             boolean registered;
 
             try {
-                registered = ctx.registerClassName(JAVA_ID, typeId, cls.getName());
+                registered = ctx.registerClassName(JAVA_ID, typeId, cls.getName(), false);
             }
             catch (Exception e) {
                 throw new IOException("Failed to register class: " + cls.getName(), e);
index e044a2d..1f56fe8 100644 (file)
@@ -6688,10 +6688,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 return null;
             }
+            catch (UnregisteredClassException | UnregisteredBinaryTypeException e) {
+                throw e;
+            }
             catch (Exception e) {
-                if (e instanceof UnregisteredClassException || e instanceof UnregisteredBinaryTypeException)
-                    throw (IgniteException)e;
-
                 writeObj = invokeEntry.valObj;
 
                 return new IgniteBiTuple<>(null, e);
index 5e43dd3..c30da7a 100644 (file)
@@ -542,18 +542,17 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme
                     GridFutureAdapter<MetadataUpdateResult> fut =
                         transport.awaitMetadataUpdate(typeId, metaHolder.pendingVersion());
 
+                    if (failIfUnregistered && !fut.isDone())
+                        throw new UnregisteredBinaryTypeException(typeId, fut);
+
                     fut.get();
                 }
+
                 return;
             }
 
             if (failIfUnregistered)
-                throw new UnregisteredBinaryTypeException(
-                    "Attempted to update binary metadata inside a critical synchronization block (will be " +
-                        "automatically retried). This exception must not be wrapped to any other exception class. " +
-                        "If you encounter this exception outside of EntryProcessor, please report to Apache Ignite " +
-                        "dev-list.",
-                    typeId, mergedMeta);
+                throw new UnregisteredBinaryTypeException(typeId, mergedMeta);
 
             long t0 = System.nanoTime();
 
index 5952617..1da003a 100644 (file)
@@ -1877,6 +1877,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             .binaryContext().descriptorForClass(ex.cls(), false, false);
                     }
                     catch (UnregisteredBinaryTypeException ex) {
+                        if (ex.future() != null) {
+                            // Wait for the future that couldn't be processed because of
+                            // IgniteThread#isForbiddenToRequestBinaryMetadata flag being true. Usually this means
+                            // that awaiting for the future right there would lead to potential deadlock if
+                            // continuous queries are used in parallel with entry processor.
+                            ex.future().get();
+
+                            // Retry and don't update current binary metadata, because it most likely already exists.
+                            continue;
+                        }
+
                         IgniteCacheObjectProcessor cacheObjProc = ctx.cacheObjects();
 
                         assert cacheObjProc instanceof CacheObjectBinaryProcessorImpl;
@@ -2216,10 +2227,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 ctx.validateKeyAndValue(entry.key(), updated);
                         }
                     }
+                    catch (UnregisteredClassException | UnregisteredBinaryTypeException e) {
+                        throw e;
+                    }
                     catch (Exception e) {
-                        if (e instanceof UnregisteredClassException || e instanceof UnregisteredBinaryTypeException)
-                            throw (IgniteException) e;
-
                         curInvokeRes = CacheInvokeResult.fromError(e);
 
                         updated = old;
index be43848..76df6a2 100644 (file)
@@ -76,7 +76,7 @@ public class PlatformBinaryProcessor extends PlatformAbstractTarget {
                 String typeName = reader.readString();
 
                 return platformContext().kernalContext().marshallerContext()
-                    .registerClassName(MarshallerPlatformIds.DOTNET_ID, typeId, typeName)
+                    .registerClassName(MarshallerPlatformIds.DOTNET_ID, typeId, typeName, false)
                     ? TRUE : FALSE;
             }
         }
index 3119fbb..1ab93c5 100644 (file)
@@ -54,7 +54,8 @@ public class ClientBinaryTypeNamePutRequest extends ClientRequest {
     /** {@inheritDoc} */
     @Override public ClientResponse process(ClientConnectionContext ctx) {
         try {
-            boolean res = ctx.kernalContext().marshallerContext().registerClassName(platformId, typeId, typeName);
+            boolean res = ctx.kernalContext().marshallerContext()
+                .registerClassName(platformId, typeId, typeName, false);
 
             return new ClientBooleanResponse(requestId(), res);
         }
index 987e999..f2ac393 100644 (file)
@@ -18,6 +18,7 @@
 package org.apache.ignite.marshaller;
 
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.UnregisteredBinaryTypeException;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 
@@ -34,10 +35,33 @@ public interface MarshallerContext {
      * @param platformId Id of a platform (java, .NET, etc.) to register mapping for.
      * @param typeId Type ID.
      * @param clsName Class name.
+     * @param failIfUnregistered If {@code true} then throw {@link UnregisteredBinaryTypeException} with
+     *      registration future instead of synchronously awaiting for its completion.
      * @return {@code True} if mapping was registered successfully.
      * @throws IgniteCheckedException In case of error.
      */
-    public boolean registerClassName(byte platformId, int typeId, String clsName) throws IgniteCheckedException;
+    public default boolean registerClassName(
+        byte platformId,
+        int typeId,
+        String clsName,
+        boolean failIfUnregistered
+    ) throws IgniteCheckedException {
+        return registerClassName(platformId, typeId, clsName);
+    }
+
+    /**
+     * Same as {@link MarshallerContext#registerClassName(byte, int, java.lang.String, boolean)} but with shortened
+     * parameters list.
+     *
+     * @deprecated Use {@link MarshallerContext#registerClassName(byte, int, java.lang.String, boolean)} instead.
+     *      This particular method will be deleted in future releases.
+     */
+    @Deprecated
+    public boolean registerClassName(
+        byte platformId,
+        int typeId,
+        String clsName
+    ) throws IgniteCheckedException;
 
     /**
      * Method to register typeId->class name mapping in marshaller context <b>on local node only</b>.
index e37e6a7..f12afe3 100644 (file)
@@ -171,6 +171,15 @@ public class IgniteThread extends Thread {
     }
 
     /**
+     * @return {@code True} if thread is not allowed to request binary metadata to avoid potential deadlock.
+     */
+    public static boolean currentThreadCanRequestBinaryMetadata() {
+        IgniteThread curThread = current();
+
+        return curThread == null || !curThread.isForbiddenToRequestBinaryMetadata();
+    }
+
+    /**
      * Callback before entry processor execution is started.
      */
     public static void onEntryProcessorEntered(boolean holdsTopLock) {
index aabd2c8..e4655dd 100644 (file)
@@ -695,8 +695,8 @@ public class BinaryMarshallerSelfTest extends GridCommonAbstractTest {
     @Test
     public void testDeclaredBodyEnum() throws Exception {
         final MarshallerContextTestImpl ctx = new MarshallerContextTestImpl();
-        ctx.registerClassName((byte)0, 1, EnumObject.class.getName());
-        ctx.registerClassName((byte)0, 2, DeclaredBodyEnum.class.getName());
+        ctx.registerClassName((byte)0, 1, EnumObject.class.getName(), false);
+        ctx.registerClassName((byte)0, 2, DeclaredBodyEnum.class.getName(), false);
 
         BinaryMarshaller marsh = binaryMarshaller();
         marsh.setContext(ctx);
index 0625903..3ea1a2d 100644 (file)
@@ -94,9 +94,9 @@ public class GridBinaryMarshallerCtxDisabledSelfTest extends GridCommonAbstractT
     private static class MarshallerContextWithNoStorage implements MarshallerContext {
         /** {@inheritDoc} */
         @Override public boolean registerClassName(
-                byte platformId,
-                int typeId,
-                String clsName
+            byte platformId,
+            int typeId,
+            String clsName
         ) throws IgniteCheckedException {
             return false;
         }
index 331dd11..dd26e09 100644 (file)
@@ -20,17 +20,37 @@ package org.apache.ignite.internal.processors.cache;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
 import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.EntryProcessorException;
 import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
 import org.junit.Test;
 
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
+
 /**
  *
  */
@@ -48,6 +68,12 @@ public class BinaryMetadataRegistrationInsideEntryProcessorTest extends GridComm
             .setPeerClassLoadingEnabled(true);
     }
 
+    /** Stop all grids after each test. */
+    @After
+    public void stopAllGridsAfterTest() {
+        stopAllGrids();
+    }
+
     /**
      * @throws Exception If failed;
      */
@@ -74,6 +100,98 @@ public class BinaryMetadataRegistrationInsideEntryProcessorTest extends GridComm
     }
 
     /**
+     * Continuously execute multiple EntryProcessors with having continuous queries in parallel.
+     * This used to lead to several deadlocks.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testContinuousQueryAndBinaryObjectBuilder() throws Exception {
+        startGrids(3).cluster().active(true);
+
+        grid(0).createCache(new CacheConfiguration<>()
+            .setName(CACHE_NAME)
+            .setAtomicityMode(ATOMIC)
+            .setBackups(2)
+            .setCacheMode(PARTITIONED)
+            .setWriteSynchronizationMode(FULL_SYNC)
+            .setPartitionLossPolicy(READ_WRITE_SAFE)
+        );
+
+        IgniteEx client1 = startGrid(getConfiguration().setIgniteInstanceName("client1").setClientMode(true));
+        IgniteEx client2 = startGrid(getConfiguration().setIgniteInstanceName("client2").setClientMode(true));
+
+        AtomicBoolean stop = new AtomicBoolean();
+        AtomicInteger keyCntr = new AtomicInteger();
+        AtomicInteger binaryTypeCntr = new AtomicInteger();
+
+        /** */
+        class MyEntryProcessor implements CacheEntryProcessor<Object, Object, Object> {
+            /** Cached int value retrieved from {@code binaryTypeCntr} variable. */
+            private int i;
+
+            /** */
+            public MyEntryProcessor(int i) {
+                this.i = i;
+            }
+
+            /** */
+            @IgniteInstanceResource
+            Ignite ignite;
+
+            /** {@inheritDoc} */
+            @Override
+            public Object process(MutableEntry<Object, Object> entry, Object... arguments) throws EntryProcessorException {
+                BinaryObjectBuilder builder = ignite.binary().builder("my_type");
+
+                builder.setField("new_field" + i, i);
+
+                entry.setValue(builder.build());
+
+                return null;
+            }
+        }
+
+        IgniteInternalFuture fut1 = GridTestUtils.runMultiThreadedAsync(() -> {
+            IgniteCache<Object, Object> cache = client1.cache(CACHE_NAME).withKeepBinary();
+
+            while (!stop.get()) {
+                Integer key = keyCntr.getAndIncrement();
+
+                cache.put(key, key);
+
+                cache.invoke(key, new MyEntryProcessor(binaryTypeCntr.get()));
+
+                binaryTypeCntr.incrementAndGet();
+            }
+        }, 8, "writer-thread");
+
+        IgniteInternalFuture fut2 = GridTestUtils.runAsync(() -> {
+            IgniteCache<Object, Object> cache = client2.cache(CACHE_NAME).withKeepBinary();
+
+            while (!stop.get()) {
+                ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+                qry.setInitialQuery(new ScanQuery<>((key, val) -> true));
+
+                qry.setLocalListener(evts -> {});
+
+                //noinspection EmptyTryBlock
+                try (QueryCursor<Cache.Entry<Object, Object>> cursor = cache.query(qry)) {
+                    // No-op.
+                }
+            }
+        });
+
+        doSleep(10_000);
+
+        stop.set(true);
+
+        fut1.get(10, TimeUnit.SECONDS);
+        fut2.get(10, TimeUnit.SECONDS);
+    }
+
+    /**
      *
      */
     private static class CustomProcessor implements EntryProcessor<Integer,
index c6f74cb..168fced 100644 (file)
@@ -140,7 +140,11 @@ public class GridCacheEntryMemorySizeSelfTest extends GridCommonAbstractTest {
         Marshaller marsh = createStandaloneBinaryMarshaller();
 
         marsh.setContext(new MarshallerContext() {
-            @Override public boolean registerClassName(byte platformId, int typeId, String clsName) {
+            @Override public boolean registerClassName(
+                byte platformId,
+                int typeId,
+                String clsName
+            ) {
                 return true;
             }
 
index ae057ee..8d98b0d 100644 (file)
@@ -72,7 +72,7 @@ public class MarshallerContextTestImpl extends MarshallerContextImpl {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean registerClassName(byte platformId, int typeId, String clsName) throws IgniteCheckedException {
+    @Override public boolean registerClassName(byte platformId, int typeId, String clsName, boolean failIfUnregistered) throws IgniteCheckedException {
         if (excluded != null && excluded.contains(clsName))
             return false;